connector.py 6.8 KB
Newer Older
1
#
Julien Muchembled's avatar
Julien Muchembled committed
2
# Copyright (C) 2009-2014  Nexedi SA
3
#
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
9 10 11 12 13 14
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 17 18

import socket
import errno
19 20 21 22 23

# Global connector registry.
# Fill by calling registerConnectorHandler.
# Read by calling getConnectorHandler.
connector_registry = {}
Olivier Cros's avatar
Olivier Cros committed
24
DEFAULT_CONNECTOR = 'SocketConnectorIPv4'
25 26

def registerConnectorHandler(connector_handler):
Vincent Pelletier's avatar
Vincent Pelletier committed
27
    connector_registry[connector_handler.__name__] = connector_handler
28

29 30 31
def getConnectorHandler(connector=None):
    if connector is None:
        connector = DEFAULT_CONNECTOR
Vincent Pelletier's avatar
Vincent Pelletier committed
32 33 34 35 36 37 38
    if isinstance(connector, basestring):
        connector_handler = connector_registry.get(connector)
    else:
        # Allow to directly provide a handler class without requiring to
        # register it first.
        connector_handler = connector
    return connector_handler
39 40

class SocketConnector:
Vincent Pelletier's avatar
Vincent Pelletier committed
41 42 43 44 45 46 47 48 49 50 51 52
    """ This class is a wrapper for a socket """

    is_listening = False
    remote_addr = None
    is_closed = None

    def __init__(self, s=None, accepted_from=None):
        self.accepted_from = accepted_from
        if accepted_from is not None:
            self.remote_addr = accepted_from
            self.is_listening = False
            self.is_closed = False
53
        if s is None:
54
            self.socket = socket.socket(self.af_type, socket.SOCK_STREAM)
Vincent Pelletier's avatar
Vincent Pelletier committed
55 56
        else:
            self.socket = s
57
        self.socket_fd = self.socket.fileno()
58 59
        # always use non-blocking sockets
        self.socket.setblocking(0)
60 61
        # disable Nagle algorithm to reduce latency
        self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
62

Vincent Pelletier's avatar
Vincent Pelletier committed
63 64 65 66
    def makeClientConnection(self, addr):
        self.is_closed = False
        self.remote_addr = addr
        try:
67 68 69 70 71 72 73 74
            self.socket.connect(addr)
        except socket.error, (err, errmsg):
            if err == errno.EINPROGRESS:
                raise ConnectorInProgressException
            if err == errno.ECONNREFUSED:
                raise ConnectorConnectionRefusedException
            raise ConnectorException, 'makeClientConnection to %s failed:' \
                ' %s:%s' % (addr, err, errmsg)
Vincent Pelletier's avatar
Vincent Pelletier committed
75 76 77 78 79 80 81 82 83 84

    def makeListeningConnection(self, addr):
        self.is_closed = False
        self.is_listening = True
        try:
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.socket.bind(addr)
            self.socket.listen(5)
        except socket.error, (err, errmsg):
            self.socket.close()
85 86
            raise ConnectorException, 'makeListeningConnection on %s failed:' \
                    ' %s:%s' % (addr, err, errmsg)
Vincent Pelletier's avatar
Vincent Pelletier committed
87 88 89 90

    def getError(self):
        return self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)

91
    def getAddress(self):
Olivier Cros's avatar
Olivier Cros committed
92
        raise NotImplementedError
93

Vincent Pelletier's avatar
Vincent Pelletier committed
94
    def getDescriptor(self):
95 96
        # this descriptor must only be used by the event manager, where it
        # guarantee unicity only while the connector is opened and registered
97
        # in epoll
98
        return self.socket_fd
Vincent Pelletier's avatar
Vincent Pelletier committed
99 100 101

    def getNewConnection(self):
        try:
Olivier Cros's avatar
Olivier Cros committed
102 103
            (new_s, addr) = self._accept()
            new_s = self.__class__(new_s, accepted_from=addr)
104
            return (new_s, addr)
Vincent Pelletier's avatar
Vincent Pelletier committed
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
        except socket.error, (err, errmsg):
            if err == errno.EAGAIN:
                raise ConnectorTryAgainException
            raise ConnectorException, 'getNewConnection failed: %s:%s' % \
                (err, errmsg)

    def shutdown(self):
        # This may fail if the socket is not connected.
        try:
            self.socket.shutdown(socket.SHUT_RDWR)
        except socket.error:
            pass

    def receive(self):
        try:
            return self.socket.recv(4096)
        except socket.error, (err, errmsg):
            if err == errno.EAGAIN:
                raise ConnectorTryAgainException
124
            if err in (errno.ECONNREFUSED, errno.EHOSTUNREACH):
Vincent Pelletier's avatar
Vincent Pelletier committed
125
                raise ConnectorConnectionRefusedException
126
            if err in (errno.ECONNRESET, errno.ETIMEDOUT):
Vincent Pelletier's avatar
Vincent Pelletier committed
127 128 129 130 131 132 133 134 135
                raise ConnectorConnectionClosedException
            raise ConnectorException, 'receive failed: %s:%s' % (err, errmsg)

    def send(self, msg):
        try:
            return self.socket.send(msg)
        except socket.error, (err, errmsg):
            if err == errno.EAGAIN:
                raise ConnectorTryAgainException
136
            if err in (errno.ECONNRESET, errno.ETIMEDOUT, errno.EPIPE):
Vincent Pelletier's avatar
Vincent Pelletier committed
137
                raise ConnectorConnectionClosedException
138
            raise ConnectorException, 'send failed: %s:%s' % (err, errmsg)
Vincent Pelletier's avatar
Vincent Pelletier committed
139 140 141 142 143 144

    def close(self):
        self.is_closed = True
        return self.socket.close()

    def __repr__(self):
145
        if self.is_closed:
Vincent Pelletier's avatar
Vincent Pelletier committed
146
            fileno = '?'
147 148
        else:
            fileno = self.socket_fd
149
        result = '<%s at 0x%x fileno %s %s, ' % (self.__class__.__name__,
Vincent Pelletier's avatar
Vincent Pelletier committed
150 151 152
                 id(self), fileno, self.socket.getsockname())
        if self.is_closed is None:
            result += 'never opened'
153
        else:
Vincent Pelletier's avatar
Vincent Pelletier committed
154 155 156 157 158 159 160 161 162 163 164 165 166
            if self.is_closed:
                result += 'closed '
            else:
                result += 'opened '
            if self.is_listening:
                result += 'listening'
            else:
                if self.accepted_from is None:
                    result += 'to'
                else:
                    result += 'from'
                result += ' %s' % (self.remote_addr, )
        return result + '>'
167

Olivier Cros's avatar
Olivier Cros committed
168 169
    def _accept(self):
        raise NotImplementedError
170

Olivier Cros's avatar
Olivier Cros committed
171
class SocketConnectorIPv4(SocketConnector):
172 173
    " Wrapper for IPv4 sockets"
    af_type = socket.AF_INET
Olivier Cros's avatar
Olivier Cros committed
174

175 176
    def _accept(self):
        return self.socket.accept()
Olivier Cros's avatar
Olivier Cros committed
177

178
    def getAddress(self):
Olivier Cros's avatar
Olivier Cros committed
179
        return self.socket.getsockname()
180

Olivier Cros's avatar
Olivier Cros committed
181
class SocketConnectorIPv6(SocketConnector):
182 183 184
    " Wrapper for IPv6 sockets"
    af_type = socket.AF_INET6

Olivier Cros's avatar
Olivier Cros committed
185
    def _accept(self):
186
        new_s, addr =  self.socket.accept()
187
        return new_s, addr[:2]
188

Olivier Cros's avatar
Olivier Cros committed
189
    def getAddress(self):
190
        return self.socket.getsockname()[:2]
191

Olivier Cros's avatar
Olivier Cros committed
192 193
registerConnectorHandler(SocketConnectorIPv4)
registerConnectorHandler(SocketConnectorIPv6)
194

195
class ConnectorException(Exception):
196 197
    pass

198
class ConnectorTryAgainException(ConnectorException):
199 200
    pass

201
class ConnectorInProgressException(ConnectorException):
202 203
    pass

204
class ConnectorConnectionClosedException(ConnectorException):
205 206
    pass

207
class ConnectorConnectionRefusedException(ConnectorException):
208
    pass
209