Commit f3ef233c authored by Aurel's avatar Aurel

implement a connector wrapper for sockets, thus it makes the Connection

objects independant from the low level library used


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@250 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent a7a0c4c6
......@@ -24,6 +24,7 @@ import logging
from neo.client.dispatcher import Dispatcher
from neo.event import EventManager
from neo.util import dump
from neo import connector as Connector
class NEOStorageError(POSException.StorageError):
pass
......@@ -40,7 +41,7 @@ class Storage(BaseStorage.BaseStorage,
__name__ = 'NEOStorage'
def __init__(self, master_nodes, name, read_only=False, **kw):
def __init__(self, master_nodes, name, connector, read_only=False, **kw):
self._is_read_only = read_only
# Transaction must be under protection of lock
l = Lock()
......@@ -56,11 +57,12 @@ class Storage(BaseStorage.BaseStorage,
dispatcher = Dispatcher(em, request_queue)
dispatcher.setDaemon(True)
# Import here to prevent recursive import
connector_handler = getattr(Connector, connector)
from neo.client.app import Application
self.app = Application(master_nodes, name, em, dispatcher,
request_queue)
request_queue, connector_handler)
# Connect to primary master node
dispatcher.connectToPrimaryMasterNode(self.app)
dispatcher.connectToPrimaryMasterNode(self.app, self.app.connector_handler)
# Start dispatcher
dispatcher.start()
......
......@@ -69,10 +69,11 @@ class ConnectionPool(object):
while 1:
logging.info('trying to connect to %s:%d', *addr)
app.local_var.node_not_ready = 0
conn = MTClientConnection(app.em, handler, addr)
conn = MTClientConnection(app.em, handler, addr,
connector_handler=self.app.connector_handler)
conn.lock()
try:
if conn.getSocket() is None:
if conn.getConnector() is None:
# This happens, if a connection could not be established.
logging.error('Connection to storage node %s failed', addr)
return None
......@@ -176,12 +177,13 @@ class ConnectionPool(object):
class Application(object):
"""The client node application."""
def __init__(self, master_nodes, name, em, dispatcher, request_queue, **kw):
def __init__(self, master_nodes, name, em, dispatcher, request_queue, connector, **kw):
logging.basicConfig(level = logging.DEBUG)
logging.debug('master node address are %s' %(master_nodes,))
# Internal Attributes common to all thread
self.name = name
self.em = em
self.connector_handler = connector
self.dispatcher = dispatcher
self.nm = NodeManager()
self.cp = ConnectionPool(self)
......
......@@ -21,6 +21,7 @@ class NeoStorage(BaseConfig):
def open(self):
from Storage import Storage
return Storage(master_nodes = self.config.master_nodes, name = self.config.name)
return Storage(master_nodes = self.config.master_nodes, name = self.config.name,
connector = self.config.connector)
......@@ -63,7 +63,7 @@ class Dispatcher(Thread):
return True
return False
def connectToPrimaryMasterNode(self, app):
def connectToPrimaryMasterNode(self, app, connector):
"""Connect to a primary master node.
This can be called either at bootstrap or when
client got disconnected during process"""
......@@ -95,7 +95,7 @@ class Dispatcher(Thread):
else:
addr, port = app.primary_master_node.getServer()
# Request Node Identification
conn = MTClientConnection(app.em, handler, (addr, port))
conn = MTClientConnection(app.em, handler, (addr, port), connector_handler=connector)
if app.nm.getNodeByServer((addr, port)) is None:
n = MasterNode(server = (addr, port))
app.nm.add(n)
......
......@@ -14,5 +14,10 @@
Give the name of the cluster
</description>
</key>
<key name="connector" required="yes">
<description>
Give the name of the connector used at low-level
</description>
</key>
</sectiontype>
</component>
......@@ -69,6 +69,9 @@ class ConfigurationManager:
def getPartitions(self):
return int(self['partitions'])
def getConnector(self):
return str(self['connector'])
def getName(self):
return self['name']
......
......@@ -15,42 +15,45 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import socket
import errno
import logging
from threading import RLock
from neo.protocol import Packet, ProtocolError
from neo.event import IdleEvent
from neo.connector import *
class BaseConnection(object):
"""A base connection."""
def __init__(self, event_manager, handler, s = None, addr = None):
def __init__(self, event_manager, handler, connector = None,
addr = None, connector_handler = None):
self.em = event_manager
self.s = s
self.connector = connector
self.addr = addr
self.handler = handler
if s is not None:
if connector is not None:
self.connector_handler = connector.__class__
event_manager.register(self)
else:
self.connector_handler = connector_handler
def lock(self):
return 1
def unlock(self):
return None
def getSocket(self):
return self.s
def getConnector(self):
return self.connector
def getDescriptor(self):
return self.s.fileno()
return self.connector.getDescriptor()
def setSocket(self, s):
if self.s is not None:
raise RuntimeError, 'cannot overwrite a socket in a connection'
if s is not None:
self.s = s
def setConnector(self, connector):
if self.connector is not None:
raise RuntimeError, 'cannot overwrite a connector in a connection'
if connector is not None:
self.connector = connector
self.em.register(self)
def getAddress(self):
......@@ -76,41 +79,40 @@ class BaseConnection(object):
class ListeningConnection(BaseConnection):
"""A listen connection."""
def __init__(self, event_manager, handler, addr = None, **kw):
def __init__(self, event_manager, handler, addr = None,
connector_handler = None, **kw):
logging.info('listening to %s:%d', *addr)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.setblocking(0)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(addr)
s.listen(5)
except:
s.close()
raise
BaseConnection.__init__(self, event_manager, handler, s = s, addr = addr)
BaseConnection.__init__(self, event_manager, handler,
addr = addr,
connector_handler = connector_handler)
connector = self.connector_handler()
connector.makeListeningConnection(addr)
self.setConnector(connector)
self.em.addReader(self)
def readable(self):
try:
new_s, addr = self.s.accept()
new_s, addr = self.connector.getNewConnection()
logging.info('accepted a connection from %s:%d', *addr)
self.handler.connectionAccepted(self, new_s, addr)
except socket.error, m:
if m[0] == errno.EAGAIN:
return
raise
except ConnectorTryAgainException:
pass
class Connection(BaseConnection):
"""A connection."""
def __init__(self, event_manager, handler, s = None, addr = None):
def __init__(self, event_manager, handler,
connector = None, addr = None,
connector_handler = None):
self.read_buf = []
self.write_buf = []
self.cur_id = 0
self.event_dict = {}
self.aborted = False
self.uuid = None
BaseConnection.__init__(self, event_manager, handler, s = s, addr = addr)
if s is not None:
BaseConnection.__init__(self, event_manager, handler,
connector = connector, addr = addr,
connector_handler = connector_handler)
if connector is not None:
event_manager.addReader(self)
def getUUID(self):
......@@ -130,20 +132,15 @@ class Connection(BaseConnection):
def close(self):
"""Close the connection."""
s = self.s
em = self.em
if s is not None:
logging.debug('closing a socket for %s:%d', *(self.addr))
if self.connector is not None:
logging.debug('closing a connector for %s:%d', *(self.addr))
em.removeReader(self)
em.removeWriter(self)
em.unregister(self)
try:
# This may fail if the socket is not connected.
s.shutdown(socket.SHUT_RDWR)
except socket.error:
pass
s.close()
self.s = None
em.unregister(self)
self.connector.shutdown()
self.connector.close()
self.connector = None
for event in self.event_dict.itervalues():
em.removeIdleEvent(event)
self.event_dict.clear()
......@@ -153,7 +150,7 @@ class Connection(BaseConnection):
def abort(self):
"""Abort dealing with this connection."""
logging.debug('aborting a socket for %s:%d', *(self.addr))
logging.debug('aborting a connetor for %s:%d', *(self.addr))
self.aborted = True
def writable(self):
......@@ -209,37 +206,33 @@ class Connection(BaseConnection):
del self.read_buf[:]
def pending(self):
return self.s is not None and len(self.write_buf) != 0
return self.connector is not None and len(self.write_buf) != 0
def recv(self):
"""Receive data from a socket."""
s = self.s
"""Receive data from a connector."""
try:
r = s.recv(4096)
r = self.connector.receive()
if not r:
logging.error('cannot read')
self.handler.connectionClosed(self)
self.close()
else:
self.read_buf.append(r)
except socket.error, m:
if m[0] == errno.EAGAIN:
pass
else:
logging.error('%s', m[1])
self.handler.connectionClosed(self)
self.close()
except ConnectorTryAgainException:
pass
except:
self.handler.connectionClosed(self)
self.close()
def send(self):
"""Send data to a socket."""
s = self.s
"""Send data to a connector."""
if self.write_buf:
if len(self.write_buf) == 1:
msg = self.write_buf[0]
else:
msg = ''.join(self.write_buf)
try:
r = s.send(msg)
r = self.connector.send(msg)
if not r:
logging.error('cannot write')
self.handler.connectionClosed(self)
......@@ -248,17 +241,15 @@ class Connection(BaseConnection):
del self.write_buf[:]
else:
self.write_buf = [msg[r:]]
except socket.error, m:
if m[0] == errno.EAGAIN:
return
else:
logging.error('%s', m[1])
self.handler.connectionClosed(self)
self.close()
except ConnectorTryAgainException:
return
except:
self.handler.connectionClosed(self)
self.close()
def addPacket(self, packet):
"""Add a packet into the write buffer."""
if self.s is None:
if self.connector is None:
return
try:
......@@ -291,7 +282,7 @@ class Connection(BaseConnection):
The additional timeout defines the amount of time after the timeout
to invoke a timeoutExpired callback. If it is zero, no ping is sent, and
the callback is executed immediately."""
if self.s is None:
if self.connector is None:
return
event = IdleEvent(self, msg_id, timeout, additional_timeout)
......@@ -300,22 +291,19 @@ class Connection(BaseConnection):
class ClientConnection(Connection):
"""A connection from this node to a remote node."""
def __init__(self, event_manager, handler, addr = None, **kw):
def __init__(self, event_manager, handler, addr = None,
connector_handler = None, **kw):
self.connecting = True
Connection.__init__(self, event_manager, handler, addr = addr)
Connection.__init__(self, event_manager, handler, addr = addr,
connector_handler = connector_handler)
handler.connectionStarted(self)
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.setSocket(s)
connector = self.connector_handler()
self.setConnector(connector)
try:
s.setblocking(0)
s.connect(addr)
except socket.error, m:
if m[0] == errno.EINPROGRESS:
event_manager.addWriter(self)
else:
raise
connector.makeClientConnection(addr)
except ConnectorInProgressException:
event_manager.addWriter(self)
else:
self.connecting = False
self.handler.connectionCompleted(self)
......@@ -327,7 +315,7 @@ class ClientConnection(Connection):
def writable(self):
"""Called when self is writable."""
if self.connecting:
err = self.s.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
err = self.connector.getError()
if err:
self.handler.connectionFailed(self)
self.close()
......@@ -378,3 +366,4 @@ class MTServerConnection(ServerConnection):
def unlock(self):
self.release()
#
# Copyright (C) 2009 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import socket
import errno
import logging
class SocketConnector:
""" This class is a wrapper for a socket """
def __init__(self, s=None):
if s is None:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
else:
self.socket = s
def __getattr__(self, name):
""" fallback to default socket methods """
return getattr(self.socket, name)
def makeClientConnection(self, addr):
try:
self.socket.setblocking(0)
self.socket.connect(addr)
except socket.error, m:
if m[0] == errno.EINPROGRESS:
raise ConnectorInProgressException
else:
logging.error('%s', m[1])
raise
def makeListeningConnection(self, addr):
try:
self.socket.setblocking(0)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(addr)
self.socket.listen(5)
except:
self.socket.close()
raise
def getError(self):
return self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
def getDescriptor(self):
return self.socket.fileno()
def getNewConnection(self):
try:
new_s, addr = self.socket.accept()
new_s = SocketConnector(new_s)
return new_s, addr
except socket.error, m:
if m[0] == errno.EAGAIN:
raise ConnectorTryAgainException
else:
logging.error('%s', m[1])
raise
def shutdown(self):
# This may fail if the socket is not connected.
try:
self.socket.shutdown(socket.SHUT_RDWR)
except socket.error:
pass
def receive(self):
try:
return self.socket.recv(4096)
except socket.error, m:
if m[0] == errno.EAGAIN:
raise ConnectorTryAgainException
else:
logging.error('%s', m[1])
raise
def send(self, msg):
try:
return self.socket.send(msg)
except socket.error, m:
if m[0] == errno.EAGAIN:
raise ConnectorTryAgainException
else:
logging.error('%s', m[1])
raise
class ConnectorTryAgainException(Exception): pass
class ConnectorInProgressException(Exception): pass
......@@ -89,10 +89,10 @@ class SelectEventManager(object):
return self.connection_dict.values()
def register(self, conn):
self.connection_dict[conn.getSocket()] = conn
self.connection_dict[conn.getConnector()] = conn
def unregister(self, conn):
del self.connection_dict[conn.getSocket()]
del self.connection_dict[conn.getConnector()]
def poll(self, timeout = 1):
rlist, wlist, xlist = select(self.reader_set, self.writer_set, self.exc_list,
......@@ -146,16 +146,16 @@ class SelectEventManager(object):
pass
def addReader(self, conn):
self.reader_set.add(conn.getSocket())
self.reader_set.add(conn.getConnector())
def removeReader(self, conn):
self.reader_set.discard(conn.getSocket())
self.reader_set.discard(conn.getConnector())
def addWriter(self, conn):
self.writer_set.add(conn.getSocket())
self.writer_set.add(conn.getConnector())
def removeWriter(self, conn):
self.writer_set.discard(conn.getSocket())
self.writer_set.discard(conn.getConnector())
class EpollEventManager(object):
"""This class manages connections and events based on epoll(5)."""
......@@ -186,6 +186,7 @@ class EpollEventManager(object):
for fd in rlist:
try:
conn = self.connection_dict[fd]
#logging.info("conn is %s" %(conn,))
conn.lock()
try:
conn.readable()
......
......@@ -57,11 +57,11 @@ class EventHandler(object):
"""Called when a connection failed."""
logging.debug('connection failed for %s:%d', *(conn.getAddress()))
def connectionAccepted(self, conn, s, addr):
def connectionAccepted(self, conn, connector, addr):
"""Called when a connection is accepted."""
logging.debug('connection accepted from %s:%d', *addr)
new_conn = ServerConnection(conn.getEventManager(), conn.getHandler(),
s = s, addr = addr)
connector = connector, addr = addr)
# A request for a node identification should arrive.
new_conn.expectMessage(timeout = 10, additional_timeout = 0)
......
......@@ -36,6 +36,7 @@ from neo.master.service import ServiceEventHandler
from neo.master.secondary import SecondaryEventHandler
from neo.pt import PartitionTable
from neo.util import dump
from neo import connector
class Application(object):
"""The master node application."""
......@@ -46,6 +47,8 @@ class Application(object):
self.num_replicas = config.getReplicas()
self.num_partitions = config.getPartitions()
self.name = config.getName()
connector_handler = config.getConnector()
self.connector_handler = getattr(connector, connector_handler)
logging.debug('the number of replicas is %d, the number of partitions is %d, the name is %s',
self.num_replicas, self.num_partitions, self.name)
......@@ -94,7 +97,8 @@ class Application(object):
self.nm.add(MasterNode(server = server))
# Make a listening port.
ListeningConnection(self.em, None, addr = self.server)
ListeningConnection(self.em, None, addr = self.server,
connector_handler = self.connector_handler)
# Start the election of a primary master node.
self.electPrimary()
......@@ -172,8 +176,8 @@ class Application(object):
# Try to connect to master nodes.
if self.unconnected_master_node_set:
for addr in list(self.unconnected_master_node_set):
ClientConnection(em, handler, addr = addr)
ClientConnection(em, handler, addr = addr,
connector_handler = self.connector_handler)
if len(self.unconnected_master_node_set) == 0 \
and len(self.negotiating_master_node_set) == 0:
break
......
......@@ -33,6 +33,7 @@ from neo.storage.bootstrap import BootstrapEventHandler
from neo.storage.verification import VerificationEventHandler
from neo.storage.operation import OperationEventHandler
from neo.storage.replicator import Replicator
from neo import connector
class Application(object):
"""The storage node application."""
......@@ -44,6 +45,8 @@ class Application(object):
self.num_replicas = None
self.name = config.getName()
logging.debug('the name is %s', self.name)
connector_handler = config.getConnector()
self.connector_handler = getattr(connector, connector_handler)
self.server = config.getServer()
logging.debug('IP address is %s, port is %d', *(self.server))
......@@ -126,7 +129,8 @@ class Application(object):
self.nm.add(MasterNode(server = server))
# Make a listening port.
ListeningConnection(self.em, None, addr = self.server)
ListeningConnection(self.em, None, addr = self.server,
connector_handler = self.connector_handler)
# Connect to a primary master node, verify data, and
# start the operation. This cycle will be executed permentnly,
......@@ -208,7 +212,8 @@ class Application(object):
index += 1
ClientConnection(em, handler, \
addr = self.trying_master_node.getServer())
addr = self.trying_master_node.getServer(),
connector_handler = self.connector_handler)
t = time()
def verifyData(self):
......
......@@ -339,7 +339,8 @@ class Replicator(object):
if self.current_connection is None:
handler = ReplicationEventHandler(app)
self.current_connection = ClientConnection(app.em, handler,
addr = addr)
addr = addr,
connector_handler = app.connector_handler)
msg_id = self.current_connection.getNextId()
p = Packet()
p.requestNodeIdentification(msg_id, STORAGE_NODE_TYPE, app.uuid,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment