Commit 0cdbf0ea authored by Olivier Cros's avatar Olivier Cros

Implementing ipv6 on neo

In order to synchronise neo with slapos, it has to work perfectly with ipv4
and ipv6. This allows to integrate neo in erp5 and to prepare different buildout
installations of neo.
The protocol and connectors are no more generic but can now support IPv4 and
IPv6 connections. We adopted a specific way of development which allow to
easily add new protocols in the future.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2654 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent c4d7ac19
......@@ -56,18 +56,15 @@ class Application(object):
"""The storage node application."""
def __init__(self, config):
# always use default connector for now
self.connector_handler = getConnectorHandler()
# Internal attributes.
self.em = EventManager()
self.nm = NodeManager()
self.name = config.getCluster()
self.server = config.getBind()
self.master_addresses = config.getMasters()
self.master_addresses, connector_name = config.getMasters()
self.connector_handler = getConnectorHandler(connector_name)
neo.lib.logging.debug('IP address is %s, port is %d', *(self.server))
# The partition table is initialized after getting the number of
......
......@@ -77,7 +77,8 @@ class Application(object):
# Internal Attributes common to all thread
self._db = None
self.name = name
self.connector_handler = getConnectorHandler(connector)
master_addresses, connector_name = parseMasterList(master_nodes)
self.connector_handler = getConnectorHandler(connector_name)
self.dispatcher = Dispatcher(self.poll_thread)
self.nm = NodeManager()
self.cp = ConnectionPool(self)
......@@ -87,7 +88,7 @@ class Application(object):
self.trying_master_node = None
# load master node list
for address in parseMasterList(master_nodes):
for address in master_addresses:
self.nm.createMaster(address=address)
# no self-assigned UUID, primary master will supply us one
......
......@@ -17,7 +17,7 @@
from ConfigParser import SafeConfigParser
from neo.lib import util
from neo.lib.util import parseNodeAddress
class ConfigurationManager(object):
"""
......@@ -48,20 +48,13 @@ class ConfigurationManager(object):
def getMasters(self):
""" Get the master node list except itself """
masters = self.__get('masters')
if not masters:
return []
# lod master node list except itself
return util.parseMasterList(masters, except_node=self.getBind())
def getBind(self):
""" Get the address to bind to """
bind = self.__get('bind')
if ':' in bind:
(ip, port) = bind.split(':')
else:
(ip, port) = (bind, 0)
ip = util.resolve(ip)
return (ip, int(port))
return parseNodeAddress(bind, 0)
def getDatabase(self):
return self.__get('database')
......
......@@ -22,7 +22,7 @@ import errno
# Fill by calling registerConnectorHandler.
# Read by calling getConnectorHandler.
connector_registry = {}
DEFAULT_CONNECTOR = 'SocketConnector'
DEFAULT_CONNECTOR = 'SocketConnectorIPv4'
def registerConnectorHandler(connector_handler):
connector_registry[connector_handler.__name__] = connector_handler
......@@ -52,7 +52,7 @@ class SocketConnector:
self.is_listening = False
self.is_closed = False
if s is None:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket = socket.socket(self.af_type, socket.SOCK_STREAM)
else:
self.socket = s
self.socket_fd = self.socket.fileno()
......@@ -90,7 +90,7 @@ class SocketConnector:
return self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
def getAddress(self):
return self.socket.getsockname()
raise NotImplementedError
def getDescriptor(self):
# this descriptor must only be used by the event manager, where it
......@@ -100,9 +100,9 @@ class SocketConnector:
def getNewConnection(self):
try:
new_s, addr = self.socket.accept()
new_s = SocketConnector(new_s, accepted_from=addr)
return new_s, addr
(new_s, addr) = self._accept()
new_s = self.__class__(new_s, accepted_from=addr)
return (new_s, addr)
except socket.error, (err, errmsg):
if err == errno.EAGAIN:
raise ConnectorTryAgainException
......@@ -166,7 +166,35 @@ class SocketConnector:
result += ' %s' % (self.remote_addr, )
return result + '>'
registerConnectorHandler(SocketConnector)
def _accept(self):
raise NotImplementedError
class SocketConnectorIPv4(SocketConnector):
" Wrapper for IPv4 sockets"
af_type = socket.AF_INET
def _accept(self):
return self.socket.accept()
def getAddress(self):
return self.socket.getsockname()
class SocketConnectorIPv6(SocketConnector):
" Wrapper for IPv6 sockets"
af_type = socket.AF_INET6
def _accept(self):
new_s, addr = self.socket.accept()
addr = (addr[0], addr[1])
return (new_s, addr)
def getAddress(self):
addr = self.socket.getsockname()
addr = (addr[0], addr[1])
return addr
registerConnectorHandler(SocketConnectorIPv4)
registerConnectorHandler(SocketConnectorIPv6)
class ConnectorException(Exception):
pass
......
......@@ -15,13 +15,14 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import socket
import sys
import traceback
from types import ClassType
from socket import inet_ntoa, inet_aton
from cStringIO import StringIO
from neo.lib.util import Enum, Struct
from neo.lib.util import Enum, Struct, getAddressType
# The protocol version (major, minor).
PROTOCOL_VERSION = (4, 1)
......@@ -391,25 +392,60 @@ class PEnum(PStructItem):
enum = self._enum.__class__.__name__
raise ValueError, 'Invalid code for %s enum: %r' % (enum, code)
class PAddressIPGeneric(PStructItem):
def __init__(self, name, format):
PStructItem.__init__(self, name, format)
def encode(self, writer, address):
host, port = address
host = socket.inet_pton(self.af_type, host)
writer(self.pack(host, port))
def decode(self, reader):
data = reader(self.size)
address = self.unpack(data)
host, port = address
host = socket.inet_ntop(self.af_type, host)
return (host, port)
class PAddressIPv4(PAddressIPGeneric):
af_type = socket.AF_INET
def __init__(self, name):
PAddressIPGeneric.__init__(self, name, '!4sH')
class PAddressIPv6(PAddressIPGeneric):
af_type = socket.AF_INET6
def __init__(self, name):
PAddressIPGeneric.__init__(self, name, '!16sH')
class PAddress(PStructItem):
"""
An host address (IPv4 for now)
An host address (IPv4/IPv6)
"""
address_format_dict = {
socket.AF_INET: PAddressIPv4('ipv4'),
socket.AF_INET6: PAddressIPv6('ipv6'),
}
def __init__(self, name):
PStructItem.__init__(self, name, '!4sH')
PStructItem.__init__(self, name, '!L')
def _encode(self, writer, address):
if address is None:
address = INVALID_ADDRESS
assert len(address) == 2, address
host, port = address
host = inet_aton(host)
writer(self.pack(host, port))
af_type = getAddressType(address)
writer(self.pack(af_type))
encoder = self.address_format_dict[af_type]
encoder.encode(writer, address)
def _decode(self, reader):
data = reader(self.size)
host, port = self.unpack(data)
host = inet_ntoa(host)
af_type = self.unpack(reader(self.size))[0]
decoder = self.address_format_dict[af_type]
host, port = decoder.decode(reader)
if (host, port) == INVALID_ADDRESS:
return None
return (host, port)
......
......@@ -22,6 +22,11 @@ from zlib import adler32
from Queue import deque
from struct import pack, unpack
SOCKET_CONNECTORS_DICT = {
socket.AF_INET : 'SocketConnectorIPv4',
socket.AF_INET6: 'SocketConnectorIPv6',
}
try:
from struct import Struct
except ImportError:
......@@ -89,22 +94,65 @@ def resolve(hostname):
return None
return address_list[0]
def getAddressType(address):
"Return the type (IPv4 or IPv6) of an ip"
(host, port) = address
for af_type in SOCKET_CONNECTORS_DICT.keys():
try :
socket.inet_pton(af_type, host)
except:
continue
else:
break
else:
raise ValueError("Unknown type of host", host)
return af_type
def getConnectorFromAddress(address):
address_type = getAddressType(address)
return SOCKET_CONNECTORS_DICT[address_type]
def parseNodeAddress(address, port_opt=None):
if ']' in address:
(ip, port) = address.split(']')
ip = ip.lstrip('[')
port = port.lstrip(':')
if port == '':
port = port_opt
elif ':' in address:
(ip, port) = address.split(':')
ip = resolve(ip)
else:
ip = address
port = port_opt
if port is None:
raise ValueError
return (ip, int(port))
def parseMasterList(masters, except_node=None):
if not masters:
return []
assert masters, 'At least one master must be defined'
socket_connector = ''
# load master node list
master_node_list = []
# XXX: support '/' and ' ' as separator
masters = masters.replace('/', ' ')
for node in masters.split(' '):
ip_address, port = node.split(':')
ip_address = resolve(ip_address)
address = (ip_address, int(port))
address = parseNodeAddress(node)
if (address != except_node):
master_node_list.append(address)
return tuple(master_node_list)
socket_connector_temp = getConnectorFromAddress(address)
if socket_connector == '':
socket_connector = socket_connector_temp
elif socket_connector == socket_connector_temp:
pass
else:
return TypeError, (" Wrong connector type : you're trying to use ipv6 and ipv4 simultaneously")
return tuple(master_node_list), socket_connector
class Enum(dict):
"""
......
......@@ -45,9 +45,6 @@ class Application(object):
last_transaction = ZERO_TID
def __init__(self, config):
# always use default connector for now
self.connector_handler = getConnectorHandler()
# Internal attributes.
self.em = EventManager()
self.nm = NodeManager()
......@@ -57,10 +54,11 @@ class Application(object):
self.server = config.getBind()
self.storage_readiness = set()
for address in config.getMasters():
self.nm.createMaster(address=address)
master_addresses, connector_name = config.getMasters()
self.connector_handler = getConnectorHandler(connector_name)
for master_address in master_addresses :
self.nm.createMaster(address=master_address)
neo.lib.logging.debug('IP address is %s, port is %d', *(self.server))
# Partition table
......
......@@ -36,8 +36,8 @@ action_dict = {
}
class TerminalNeoCTL(object):
def __init__(self, ip, port, handler):
self.neoctl = NeoCTL(ip, port, handler)
def __init__(self, address):
self.neoctl = NeoCTL(address)
# Utility methods (could be functions)
def asNodeState(self, value):
......@@ -187,8 +187,8 @@ class TerminalNeoCTL(object):
class Application(object):
"""The storage node application."""
def __init__(self, ip, port, handler):
self.neoctl = TerminalNeoCTL(ip, port, handler)
def __init__(self, address):
self.neoctl = TerminalNeoCTL(address)
def execute(self, args):
"""Execute the command given."""
......
......@@ -20,6 +20,7 @@ from neo.lib.connection import ClientConnection
from neo.lib.event import EventManager
from neo.neoctl.handler import CommandEventHandler
from neo.lib.protocol import ClusterStates, NodeStates, ErrorCodes, Packets
from neo.lib.util import getConnectorFromAddress
class NotReadyException(Exception):
pass
......@@ -29,9 +30,10 @@ class NeoCTL(object):
connection = None
connected = False
def __init__(self, ip, port, handler):
self.connector_handler = getConnectorHandler(handler)
self.server = (ip, port)
def __init__(self, address):
connector_name = getConnectorFromAddress(address)
self.connector_handler = getConnectorHandler(connector_name)
self.server = address
self.em = EventManager()
self.handler = CommandEventHandler(self)
self.response_queue = []
......
......@@ -19,6 +19,7 @@
import sys
from optparse import OptionParser
from neo.lib import setupLog
from neo.lib.util import parseNodeAddress
parser = OptionParser()
parser.add_option('-v', '--verbose', action = 'store_true',
......@@ -29,16 +30,13 @@ parser.add_option('--handler', help = 'specify the connection handler')
def main(args=None):
(options, args) = parser.parse_args(args=args)
address = options.address
if ':' in address:
address, port = address.split(':', 1)
port = int(port)
if options.address is not None:
address = parseNodeAddress(options.address, 9999)
else:
port = 9999
handler = options.handler or "SocketConnector"
address = ('127.0.0.1', 9999)
setupLog('NEOCTL', options.verbose)
from neo.neoctl.app import Application
print Application(address, port, handler).execute(args)
print Application(address).execute(args)
......@@ -41,9 +41,6 @@ class Application(object):
"""The storage node application."""
def __init__(self, config):
# always use default connector for now
self.connector_handler = getConnectorHandler()
# set the cluster name
self.name = config.getCluster()
......@@ -54,9 +51,11 @@ class Application(object):
self.dm = buildDatabaseManager(config.getAdapter(), config.getDatabase())
# load master nodes
for address in config.getMasters():
self.nm.createMaster(address=address)
master_addresses, connector_name = config.getMasters()
self.connector_handler = getConnectorHandler(connector_name)
for master_address in master_addresses :
self.nm.createMaster(address=master_address)
# set the bind address
self.server = config.getBind()
neo.lib.logging.debug('IP address is %s, port is %d', *(self.server))
......
......@@ -21,10 +21,13 @@ import random
import unittest
import tempfile
import MySQLdb
import socket
import neo
from mock import Mock
from neo.lib import protocol
from neo.lib.protocol import Packets
from neo.lib.util import getAddressType
from time import time, gmtime
from struct import pack, unpack
......@@ -33,6 +36,22 @@ DB_ADMIN = os.getenv('NEO_DB_ADMIN', 'root')
DB_PASSWD = os.getenv('NEO_DB_PASSWD', None)
DB_USER = os.getenv('NEO_DB_USER', 'test')
IP_VERSION_FORMAT_DICT = {
socket.AF_INET: '127.0.0.1',
socket.AF_INET6: '::1',
}
ADDRESS_TYPE = socket.AF_INET
def buildUrlFromString(address):
try:
socket.inet_pton(socket.AF_INET6, address)
address = '[%s]' % address
except:
pass
return address
class NeoTestBase(unittest.TestCase):
def setUp(self):
sys.stdout.write(' * %s ' % (self.id(), ))
......@@ -47,8 +66,10 @@ class NeoTestBase(unittest.TestCase):
class NeoUnitTestBase(NeoTestBase):
""" Base class for neo tests, implements common checks """
local_ip = IP_VERSION_FORMAT_DICT[ADDRESS_TYPE]
def prepareDatabase(self, number, admin=DB_ADMIN, password=DB_PASSWD,
user=DB_USER, prefix=DB_PREFIX):
user=DB_USER, prefix=DB_PREFIX, address_type = ADDRESS_TYPE):
""" create empties databases """
# SQL connection
connect_arg_dict = {'user': admin}
......@@ -69,11 +90,13 @@ class NeoUnitTestBase(NeoTestBase):
def getMasterConfiguration(self, cluster='main', master_number=2,
replicas=2, partitions=1009, uuid=None):
assert master_number >= 1 and master_number <= 10
masters = [('127.0.0.1', 10010 + i) for i in xrange(master_number)]
masters = ([(self.local_ip, 10010 + i)
for i in xrange(master_number)])
return Mock({
'getCluster': cluster,
'getBind': masters[0],
'getMasters': masters,
'getMasters': (masters, getAddressType((
self.local_ip, 0))),
'getReplicas': replicas,
'getPartitions': partitions,
'getUUID': uuid,
......@@ -83,13 +106,15 @@ class NeoUnitTestBase(NeoTestBase):
index=0, prefix=DB_PREFIX, uuid=None):
assert master_number >= 1 and master_number <= 10
assert index >= 0 and index <= 9
masters = [('127.0.0.1', 10010 + i) for i in xrange(master_number)]
masters = [(buildUrlFromString(self.local_ip),
10010 + i) for i in xrange(master_number)]
database = '%s@%s%s' % (DB_USER, prefix, index)
return Mock({
'getCluster': cluster,
'getName': 'storage',
'getBind': ('127.0.0.1', 10020 + index),
'getMasters': masters,
'getBind': (masters[0], 10020 + index),
'getMasters': (masters, getAddressType((
self.local_ip, 0))),
'getDatabase': database,
'getUUID': uuid,
'getReset': False,
......
......@@ -19,13 +19,13 @@ import unittest
from cPickle import dumps
from mock import Mock, ReturnValues
from ZODB.POSException import StorageTransactionError, UndoError, ConflictError
from neo.tests import NeoUnitTestBase
from neo.tests import NeoUnitTestBase, buildUrlFromString, ADDRESS_TYPE
from neo.client.app import Application, RevisionIndex
from neo.client.exception import NEOStorageError, NEOStorageNotFoundError
from neo.client.exception import NEOStorageDoesNotExistError
from neo.lib.protocol import Packet, Packets, Errors, INVALID_TID, \
INVALID_PARTITION
from neo.lib.util import makeChecksum
from neo.lib.util import makeChecksum, SOCKET_CONNECTORS_DICT
import time
def _getMasterConnection(self):
......@@ -103,8 +103,10 @@ class ClientApplicationTests(NeoUnitTestBase):
return packet.decode()
return packet
def getApp(self, master_nodes='127.0.0.1:10010', name='test',
connector='SocketConnector', **kw):
def getApp(self, master_nodes=None, name='test', **kw):
connector = SOCKET_CONNECTORS_DICT[ADDRESS_TYPE]
if master_nodes is None:
master_nodes = '%s:10010' % buildUrlFromString(self.local_ip)
app = Application(master_nodes, name, connector, **kw)
self._to_stop_list.append(app)
app.dispatcher = Mock({ })
......@@ -999,7 +1001,7 @@ class ClientApplicationTests(NeoUnitTestBase):
def test_askPrimary(self):
""" _askPrimary is private but test it anyway """
app = self.getApp('')
app = self.getApp()
conn = Mock()
app.master_conn = conn
app.primary_handler = Mock()
......
......@@ -30,8 +30,9 @@ import threading
from neo.neoctl.neoctl import NeoCTL, NotReadyException
from neo.lib.protocol import ClusterStates, NodeTypes, CellStates, NodeStates
from neo.lib.util import dump
from neo.tests import DB_ADMIN, DB_PASSWD, NeoTestBase
from neo.lib.util import dump, SOCKET_CONNECTORS_DICT
from neo.tests import DB_ADMIN, DB_PASSWD, NeoTestBase, buildUrlFromString, \
ADDRESS_TYPE, IP_VERSION_FORMAT_DICT
from neo.client.Storage import Storage
NEO_MASTER = 'neomaster'
......@@ -171,7 +172,9 @@ class NEOCluster(object):
db_super_user=DB_ADMIN, db_super_password=DB_PASSWD,
cleanup_on_delete=False, temp_dir=None,
clear_databases=True, adapter='MySQL',
verbose=True):
verbose=True,
address_type=ADDRESS_TYPE,
):
self.zodb_storage_list = []
self.cleanup_on_delete = cleanup_on_delete
self.verbose = verbose
......@@ -181,6 +184,8 @@ class NEOCluster(object):
self.db_user = db_user
self.db_password = db_password
self.db_list = db_list
self.address_type = address_type
self.local_ip = IP_VERSION_FORMAT_DICT[self.address_type]
if clear_databases:
self.setupDB()
self.process_dict = {}
......@@ -192,12 +197,16 @@ class NEOCluster(object):
admin_port = self.__allocatePort()
self.cluster_name = 'neo_%s' % (random.randint(0, 100), )
master_node_list = [self.__allocatePort() for i in xrange(master_node_count)]
self.master_nodes = '/'.join('127.0.0.1:%s' % (x, ) for x in master_node_list)
self.master_nodes = '/'.join('%s:%s' % (
buildUrlFromString(self.local_ip), x, )
for x in master_node_list)
# create admin node
self.__newProcess(NEO_ADMIN, {
'--cluster': self.cluster_name,
'--name': 'admin',
'--bind': '127.0.0.1:%d' % (admin_port, ),
'--bind': '%s:%d' % (buildUrlFromString(
self.local_ip), admin_port, ),
'--masters': self.master_nodes,
})
# create master nodes
......@@ -205,7 +214,8 @@ class NEOCluster(object):
self.__newProcess(NEO_MASTER, {
'--cluster': self.cluster_name,
'--name': 'master_%d' % index,
'--bind': '127.0.0.1:%d' % (port, ),
'--bind': '%s:%d' % (buildUrlFromString(
self.local_ip), port, ),
'--masters': self.master_nodes,
'--replicas': replicas,
'--partitions': partitions,
......@@ -215,14 +225,16 @@ class NEOCluster(object):
self.__newProcess(NEO_STORAGE, {
'--cluster': self.cluster_name,
'--name': 'storage_%d' % index,
'--bind': '%s:%d' % (buildUrlFromString(
self.local_ip),
0 ),
'--masters': self.master_nodes,
'--database': '%s:%s@%s' % (db_user, db_password, db),
'--adapter': adapter,
})
# create neoctl
self.neoctl = NeoCTL('127.0.0.1', admin_port,
'SocketConnector')
self.neoctl = NeoCTL((self.local_ip, admin_port))
def __newProcess(self, command, arguments):
uuid = self.__allocateUUID()
arguments['--uuid'] = uuid
......@@ -236,10 +248,10 @@ class NEOCluster(object):
def __allocatePort(self):
port_set = self.port_set
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s = socket.socket(self.address_type, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
while True:
s.bind(('127.0.0.1', 0))
s.bind((self.local_ip, 0))
port = s.getsockname()[1]
if port not in port_set:
break
......@@ -343,12 +355,12 @@ class NEOCluster(object):
def getNEOCTL(self):
return self.neoctl
def getZODBStorage(self, **kw):
def getZODBStorage(self,connector = SOCKET_CONNECTORS_DICT[ADDRESS_TYPE], **kw):
master_nodes = self.master_nodes.replace('/', ' ')
result = Storage(
master_nodes=master_nodes,
name=self.cluster_name,
connector='SocketConnector',
connector=connector,
logfile=os.path.join(self.temp_dir, 'client.log'),
verbose=self.verbose,
**kw
......
......@@ -19,13 +19,17 @@ import os
import unittest
import transaction
import ZODB
import socket
from struct import pack, unpack
from neo.neoctl.neoctl import NeoCTL
from ZODB.FileStorage import FileStorage
from ZODB.POSException import ConflictError
from ZODB.tests.StorageTestBase import zodb_pickle
from persistent import Persistent
from neo.lib.util import SOCKET_CONNECTORS_DICT
from neo.tests.functional import NEOCluster, NEOFunctionalTest
from neo.tests import IP_VERSION_FORMAT_DICT
TREE_SIZE = 6
......@@ -263,6 +267,23 @@ class ClientTests(NEOFunctionalTest):
self.assertRaises(ConflictError, st2.tpc_vote, t2)
self.runWithTimeout(40, test)
def testIPv6Client(self):
""" Test the connectivity of an IPv6 connection for neo client """
def test():
"""
Implement the IPv6Client test
"""
self.neo = NEOCluster(['test_neo1'], replicas=0,
temp_dir = self.getTempDirectory(),
address_type = socket.AF_INET6
)
neoctl = NeoCTL(('::1', 0))
self.neo.start()
db1, conn1 = self.neo.getZODBConnection()
db2, conn2 = self.neo.getZODBConnection()
self.runWithTimeout(40, test)
def testDelayedLocksCancelled(self):
"""
Hold a lock on an object, try to get another lock on the same
......
......@@ -20,7 +20,8 @@ from mock import Mock
from neo.lib import protocol
from neo.tests import NeoUnitTestBase
from neo.lib.protocol import Packet, NodeTypes, NodeStates
from neo.master.handlers.election import ClientElectionHandler, ServerElectionHandler
from neo.master.handlers.election import ClientElectionHandler, \
ServerElectionHandler
from neo.master.app import Application
from neo.lib.exception import ElectionFailure