Commit 10598deb by Grégory Wisniewski

Remove trailing whitespaces.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@1420 71dcc9de-d417-0410-9af5-da40c76e7ee4
1 parent fb85bde4
Showing 85 changed files with 681 additions and 681 deletions
#
# Copyright (C) 2006-2009 Nexedi SA
#
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
......
......@@ -118,7 +118,7 @@ class Application(object):
nm.createMaster(address=address)
# search, find, connect and identify to the primary master
bootstrap = BootstrapManager(self, self.name, NodeTypes.ADMIN,
bootstrap = BootstrapManager(self, self.name, NodeTypes.ADMIN,
self.uuid, self.server)
data = bootstrap.getPrimaryConnection(self.connector_handler)
(node, conn, uuid, num_partitions, num_replicas) = data
......
......@@ -27,7 +27,7 @@ class AdminEventHandler(EventHandler):
"""This class deals with events for administrating cluster."""
def askPartitionList(self, conn, packet, min_offset, max_offset, uuid):
logging.info("ask partition list from %s to %s for %s" %
logging.info("ask partition list from %s to %s for %s" %
(min_offset, max_offset, dump(uuid)))
app = self.app
# check we have one pt otherwise ask it to PMN
......@@ -43,7 +43,7 @@ class AdminEventHandler(EventHandler):
'uuid' : uuid,
'msg_id' : packet.getId()})
else:
app.sendPartitionTable(conn, min_offset, max_offset, uuid,
app.sendPartitionTable(conn, min_offset, max_offset, uuid,
packet.getId())
......@@ -96,10 +96,10 @@ class AdminEventHandler(EventHandler):
'master.')
# required it from PMN first
msg_id = self.app.master_conn.ask(Packets.AskClusterState())
self.app.dispatcher.register(msg_id, conn,
self.app.dispatcher.register(msg_id, conn,
{'msg_id' : packet.getId()})
else:
conn.answer(Packets.AnswerClusterState(self.app.cluster_state),
conn.answer(Packets.AnswerClusterState(self.app.cluster_state),
packet.getId())
def askPrimary(self, conn, packet):
......
#
# Copyright (C) 2006-2009 Nexedi SA
#
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
......@@ -26,14 +26,14 @@ from neo.connection import ClientConnection
NO_SERVER = ('0.0.0.0', 0)
class BootstrapManager(EventHandler):
"""
"""
Manage the bootstrap stage, lookup for the primary master then connect to it
"""
def __init__(self, app, name, node_type, uuid=None, server=NO_SERVER):
"""
Manage the bootstrap stage of a non-master node, it lookup for the
primary master node, connect to it then returns when the master node
primary master node, connect to it then returns when the master node
is ready.
"""
EventHandler.__init__(self, app)
......@@ -72,7 +72,7 @@ class BootstrapManager(EventHandler):
def notReady(self, conn, packet, message):
"""
The primary master send this message when it is still not ready to
handle the client node.
handle the client node.
Close connection and restart.
"""
# master are still electing on of them
......@@ -134,7 +134,7 @@ class BootstrapManager(EventHandler):
# retry until identified to the primary
while self.primary is None or conn.getUUID() != self.primary.getUUID():
if self.current is None:
# conn closed
# conn closed
conn = None
# select a master
master_list = nm.getMasterList()
......@@ -147,7 +147,7 @@ class BootstrapManager(EventHandler):
# open the connection
addr = self.current.getAddress()
conn = ClientConnection(em, self, addr, connector_handler)
# still processing
# still processing
em.poll(1)
node = nm.getByUUID(conn.getUUID())
return (node, conn, self.uuid, self.num_partitions, self.num_replicas)
......
#
# Copyright (C) 2006-2009 Nexedi SA
#
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
......@@ -63,7 +63,7 @@ class Storage(BaseStorage.BaseStorage,
def tpc_begin(self, transaction, tid=None, status=' '):
if self._is_read_only:
raise POSException.ReadOnlyError()
return self.app.tpc_begin(transaction=transaction, tid=tid,
return self.app.tpc_begin(transaction=transaction, tid=tid,
status=status)
def tpc_vote(self, transaction):
......
#
# Copyright (C) 2006-2009 Nexedi SA
#
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
......@@ -47,7 +47,7 @@ from neo.client.mq import MQ
from neo.util import u64, parseMasterList
class ConnectionClosed(Exception):
class ConnectionClosed(Exception):
pass
......@@ -94,7 +94,7 @@ class ConnectionPool(object):
conn.unlock()
try:
app._waitMessage(conn, msg_id,
app._waitMessage(conn, msg_id,
handler=app.storage_bootstrap_handler)
except ConnectionClosed:
logging.error('Connection to storage node %s failed', node)
......@@ -213,7 +213,7 @@ class ThreadContext(object):
return thread_data[name]
except KeyError:
raise AttributeError, name
def __setattr__(self, name, value):
thread_data = self.__getThreadData()
thread_data[name] = value
......@@ -408,7 +408,7 @@ class Application(object):
self.trying_master_node = master_list[0]
index += 1
# Connect to master
conn = MTClientConnection(self.em, self.notifications_handler,
conn = MTClientConnection(self.em, self.notifications_handler,
addr=self.trying_master_node.getAddress(),
connector_handler=self.connector_handler,
dispatcher=self.dispatcher)
......@@ -420,12 +420,12 @@ class Application(object):
logging.error('Connection to master node %s failed',
self.trying_master_node)
continue
msg_id = conn.ask(self.local_var.queue,
msg_id = conn.ask(self.local_var.queue,
Packets.AskPrimary())
finally:
conn.unlock()
try:
self._waitMessage(conn, msg_id,
self._waitMessage(conn, msg_id,
handler=self.primary_bootstrap_handler)
except ConnectionClosed:
continue
......@@ -449,7 +449,7 @@ class Application(object):
finally:
conn.unlock()
try:
self._waitMessage(conn, msg_id,
self._waitMessage(conn, msg_id,
handler=self.primary_bootstrap_handler)
except ConnectionClosed:
self.primary_master_node = None
......@@ -473,7 +473,7 @@ class Application(object):
Packets.AskNodeInformation())
finally:
conn.unlock()
self._waitMessage(conn, msg_id,
self._waitMessage(conn, msg_id,
handler=self.primary_bootstrap_handler)
conn.lock()
try:
......@@ -481,14 +481,14 @@ class Application(object):
Packets.AskPartitionTable([]))
finally:
conn.unlock()
self._waitMessage(conn, msg_id,
self._waitMessage(conn, msg_id,
handler=self.primary_bootstrap_handler)
ready = self.uuid is not None and self.pt is not None \
and self.pt.operational()
logging.info("connected to primary master node %s" %
logging.info("connected to primary master node %s" %
self.primary_master_node)
return conn
def registerDB(self, db, limit):
self._db = db
......@@ -580,9 +580,9 @@ class Application(object):
break
if self.local_var.asked_object == 0:
# We didn't got any object from all storage node because of
# We didn't got any object from all storage node because of
# connection error
logging.warning('oid %s not found because of connection failure',
logging.warning('oid %s not found because of connection failure',
dump(oid))
raise NEOStorageNotFoundError()
......@@ -658,7 +658,7 @@ class Application(object):
self._askPrimary(Packets.AskBeginTransaction(tid))
if self.local_var.tid is None:
raise NEOStorageError('tpc_begin failed')
self.local_var.txn = transaction
self.local_var.txn = transaction
def store(self, oid, serial, data, version, transaction):
......@@ -680,7 +680,7 @@ class Application(object):
self.local_var.object_stored_counter = 0
for cell in cell_list:
conn = self.cp.getConnForCell(cell)
if conn is None:
if conn is None:
continue
self.local_var.object_stored = 0
......@@ -696,20 +696,20 @@ class Application(object):
if self.local_var.data_dict.has_key(oid):
# One storage already accept the object, is it normal ??
# remove from dict and raise ConflictError, don't care of
# previous node which already store data as it would be
# resent again if conflict is resolved or txn will be
# previous node which already store data as it would be
# resent again if conflict is resolved or txn will be
# aborted
del self.local_var.data_dict[oid]
self.local_var.conflict_serial = self.local_var.object_stored[1]
raise NEOStorageConflictError
# increase counter so that we know if a node has stored the object
# increase counter so that we know if a node has stored the object
# or not
self.local_var.object_stored_counter += 1
if self.local_var.object_stored_counter == 0:
# no storage nodes were available
raise NEOStorageError('tpc_store failed')
# Store object in tmp cache
self.local_var.data_dict[oid] = data
......@@ -729,14 +729,14 @@ class Application(object):
cell_list = self._getCellListForTID(self.local_var.tid, writable=True)
self.local_var.voted_counter = 0
for cell in cell_list:
logging.debug("voting object %s %s" %(cell.getAddress(),
logging.debug("voting object %s %s" %(cell.getAddress(),
cell.getState()))
conn = self.cp.getConnForCell(cell)
if conn is None:
continue
self.local_var.txn_voted = False
p = Packets.AskStoreTransaction(self.local_var.tid,
p = Packets.AskStoreTransaction(self.local_var.tid,
user, desc, ext, oid_list)
try:
self._askStorage(conn, p)
......@@ -761,7 +761,7 @@ class Application(object):
for oid in self.local_var.data_dict.iterkeys():
cell_set |= set(self._getCellListForOID(oid, writable=True))
# select nodes where transaction was stored
cell_set |= set(self._getCellListForTID(self.local_var.tid,
cell_set |= set(self._getCellListForTID(self.local_var.tid,
writable=True))
# cancel transaction one all those nodes
......@@ -869,12 +869,12 @@ class Application(object):
self.store(oid, transaction_id, data, None, txn)
except NEOStorageConflictError, serial:
if serial <= self.local_var.tid:
new_data = wrapper.tryToResolveConflict(oid,
new_data = wrapper.tryToResolveConflict(oid,
self.local_var.tid, serial, data)
if new_data is not None:
self.store(oid, self.local_var.tid, new_data, None, txn)
continue
raise ConflictError(oid = oid, serials = (self.local_var.tid,
raise ConflictError(oid = oid, serials = (self.local_var.tid,
serial),
data = data)
return self.local_var.tid, oid_list
......@@ -897,7 +897,7 @@ class Application(object):
continue
try:
conn.ask(self.local_var.queue, Packets.AskTIDs(first, last,
conn.ask(self.local_var.queue, Packets.AskTIDs(first, last,
protocol.INVALID_PARTITION))
finally:
conn.unlock()
......@@ -929,7 +929,7 @@ class Application(object):
if conn is not None:
self.local_var.txn_info = 0
try:
self._askStorage(conn,
self._askStorage(conn,
Packets.AskTransactionInformation(tid))
except ConnectionClosed:
continue
......@@ -951,7 +951,7 @@ class Application(object):
# Check we return at least one element, otherwise call
# again but extend offset
if len(undo_info) == 0 and not block:
undo_info = self.undoLog(first=first, last=last*5, filter=filter,
undo_info = self.undoLog(first=first, last=last*5, filter=filter,
block=1)
return undo_info
......@@ -1006,7 +1006,7 @@ class Application(object):
# ask transaction information
self.local_var.txn_info = None
try:
self._askStorage(conn,
self._askStorage(conn,
Packets.AskTransactionInformation(serial))
except ConnectionClosed:
continue
......
#
# Copyright (C) 2006-2009 Nexedi SA
#
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
......@@ -21,7 +21,7 @@ class NeoStorage(BaseConfig):
def open(self):
from Storage import Storage
return Storage(master_nodes=self.config.master_nodes,
return Storage(master_nodes=self.config.master_nodes,
name=self.config.name, connector = self.config.connector)
#
# Copyright (C) 2006-2009 Nexedi SA
#
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
......@@ -17,11 +17,11 @@
from ZODB import POSException
class NEOStorageError(POSException.StorageError):
class NEOStorageError(POSException.StorageError):
pass
class NEOStorageConflictError(NEOStorageError):
class NEOStorageConflictError(NEOStorageError):
pass
class NEOStorageNotFoundError(NEOStorageError):
class NEOStorageNotFoundError(NEOStorageError):
pass
#
# Copyright (C) 2006-2009 Nexedi SA
#
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
......
#
# Copyright (C) 2006-2009 Nexedi SA
#
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
......@@ -54,7 +54,7 @@ class PrimaryBootstrapHandler(AnswerBaseHandler):
# got an uuid from the primary master
app.uuid = your_uuid
# Always create partition table
# Always create partition table
app.pt = PartitionTable(num_partitions, num_replicas)
def answerPrimary(self, conn, packet, primary_uuid,
......@@ -85,13 +85,13 @@ class PrimaryBootstrapHandler(AnswerBaseHandler):
# The primary master node is not a primary master node
# any longer.
app.primary_master_node = None
app.trying_master_node = None
conn.close()
def answerPartitionTable(self, conn, packet, ptid, row_list):
pass
def answerNodeInformation(self, conn, packet):
pass
......@@ -106,7 +106,7 @@ class PrimaryNotificationsHandler(BaseHandler):
app.master_conn = None
app.primary_master_node = None
else:
logging.warn('app.master_conn is %s, but we are closing %s',
logging.warn('app.master_conn is %s, but we are closing %s',
app.master_conn, conn)
super(PrimaryNotificationsHandler, self).connectionClosed(conn)
......@@ -146,7 +146,7 @@ class PrimaryNotificationsHandler(BaseHandler):
finally:
app._cache_lock_release()
# For the two methods below, we must not use app._getPartitionTable()
# For the two methods below, we must not use app._getPartitionTable()
# to avoid a dead lock. It is safe to not check the master connection
# because it's in the master handler, so the connection is already
# established.
......
#
# Copyright (C) 2006-2009 Nexedi SA
#
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
......@@ -29,7 +29,7 @@ class StorageEventHandler(BaseHandler):
assert node is not None
# Remove from pool connection
app.cp.removeConnection(node)
app.dispatcher.unregister(conn)
app.dispatcher.unregister(conn)
def connectionLost(self, conn, new_state):
self._dealWithStorageFailure(conn)
......@@ -51,7 +51,7 @@ class StorageBootstrapHandler(AnswerBaseHandler):
def notReady(self, conn, packet, message):
app = self.app
app.setNodeNotReady()
def acceptIdentification(self, conn, packet, node_type,
uuid, address, num_partitions, num_replicas, your_uuid):
app = self.app
......@@ -74,11 +74,11 @@ class StorageBootstrapHandler(AnswerBaseHandler):
class StorageAnswersHandler(AnswerBaseHandler):
""" Handle all messages related to ZODB operations """
def answerObject(self, conn, packet, oid, start_serial, end_serial,
def answerObject(self, conn, packet, oid, start_serial, end_serial,
compression, checksum, data):
app = self.app
app.local_var.asked_object = (oid, start_serial, end_serial,
app.local_var.asked_object = (oid, start_serial, end_serial,
compression, checksum, data)
def answerStoreObject(self, conn, packet, conflicting, oid, serial):
......
#
# Copyright (C) 2006-2009 Nexedi SA
#
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
......
......@@ -37,12 +37,12 @@ class Element(object):
This class defines an element of a FIFO buffer.
"""
pass
class FIFO(object):
"""
This class implements a FIFO buffer.
"""
def __init__(self):
self._head = None
self._tail = None
......@@ -56,10 +56,10 @@ class FIFO(object):
self.element = None
self.key = None
self.expire_time = None
def __len__(self):
return self._len
def append(self):
element = Element()
element.next = None
......@@ -74,10 +74,10 @@ class FIFO(object):
def head(self):
return self._head
def tail(self):
return self._tail
def shift(self):
element = self._head
if element is None:
......@@ -86,26 +86,26 @@ class FIFO(object):
del element.next
del element.prev
return element
def __delitem__(self, element):
if element.next is None:
self._tail = element.prev
else:
element.next.prev = element.prev
if element.prev is None:
self._head = element.next
else:
element.prev.next = element.next
self._len -= 1
self._len -= 1
class Data(object):
"""
Data for each element in a FIFO buffer.
"""
pass
def sizeof(o):
"""This function returns the estimated size of an object."""
if isinstance(o, tuple):
......@@ -116,22 +116,22 @@ def sizeof(o):
class MQ(object):
"""
This class manages cached data by a variant of Multi-Queue.
This class caches various sizes of objects. Here are some considerations:
- Expired objects are not really deleted immediately. But if GC is invoked too often,
it degrades the performance significantly.
- If large objects are cached, the number of cached objects decreases. This might affect
the cache hit ratio. It might be better to tweak a buffer level according to the size of
an object.
- Stored values must be strings.
- The size calculation is not accurate.
"""
def __init__(self, life_time=10000, buffer_levels=9,
def __init__(self, life_time=10000, buffer_levels=9,
max_history_size=100000, max_size=20*1024*1024):
self._history_buffer = FIFO()
self._cache_buffers = []
......@@ -144,16 +144,16 @@ class MQ(object):
self._max_history_size = max_history_size
self._max_size = max_size
self._size = 0
def has_key(self, key):
if key in self._data:
data = self._data[key]
if data.level >= 0:
return 1
return 0
__contains__ = has_key
def fetch(self, key):
"""
Fetch a value associated with the key.
......@@ -165,15 +165,15 @@ class MQ(object):
self.store(key, value)
return value
raise KeyError(key)
__getitem__ = fetch
def get(self, key, d=None):
try:
return self.fetch(key)
except KeyError:
return d
def _evict(self, key):
"""
Evict an element to the history buffer.
......@@ -190,7 +190,7 @@ class MQ(object):
if len(self._history_buffer) > self._max_history_size:
element = self._history_buffer.shift()
del self._data[element.data.key]
def store(self, key, value):
cache_buffers = self._cache_buffers
......@@ -203,8 +203,8 @@ class MQ(object):
del self._history_buffer[element]
except KeyError:
counter = 1
# XXX It might be better to adjust the level according to the object
# XXX It might be better to adjust the level according to the object
# size.
level = min(int(log(counter, 2)), self._buffer_levels - 1)
element = cache_buffers[level].append()
......@@ -219,7 +219,7 @@ class MQ(object):
self._data[key] = data
self._size += sizeof(value)
del value
self._time += 1
# Expire old elements.
......@@ -239,7 +239,7 @@ class MQ(object):
data.element = element
else:
self._evict(data.key)
# Limit the size.
size = self._size
max_size = self._max_size
......@@ -256,9 +256,9 @@ class MQ(object):
if size <= max_size:
break
self._size = size
__setitem__ = store
def invalidate(self, key):
if id in self._data:
data = self._data[key]
......@@ -269,14 +269,14 @@ class MQ(object):
raise KeyError, "%s was not found in the cache" % key
__delitem__ = invalidate
# Here is a test.
if __name__ == '__main__':
import hotshot, hotshot.stats
def test():
cache = MQ(life_time=100, buffer_levels=9, max_history_size=10000,
cache = MQ(life_time=100, buffer_levels=9, max_history_size=10000,
max_size=2*1024*1024)
for i in xrange(10000):
......
#
# Copyright (C) 2006-2009 Nexedi SA
#
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
......
#
# Copyright (C) 2006-2009 Nexedi SA
#
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
......@@ -20,7 +20,7 @@ from neo.util import bin, parseMasterList
class ConfigurationManager(object):
"""
"""
Configuration manager that load options from a configuration file and
command line arguments
"""
......@@ -52,7 +52,7 @@ class ConfigurationManager(object):
return []
# load master node list except itself
return parseMasterList(masters, except_node=self.getBind())
def getBind(self):
""" Get the address to bind to """
bind = self.__get('bind')
......
#
# Copyright (C) 2006-2009 Nexedi SA
#
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
......@@ -70,9 +70,9 @@ class BaseConnection(object):
if connector is not None:
self.connector_handler = connector.__class__
event_manager.register(self)
else:
else:
self.connector_handler = connector_handler
def lock(self):
return 1
......@@ -104,7 +104,7 @@ class BaseConnection(object):
if self.connector is not None:
em.removeReader(self)
em.removeWriter(self)
em.unregister(self)
em.unregister(self)
self.connector.shutdown()
self.connector.close()
self.connector = None
......@@ -202,7 +202,7 @@ class Connection(BaseConnection):
return next_id
def close(self):
logging.debug('closing a connector for %s (%s:%d)',
logging.debug('closing a connector for %s (%s:%d)',
dump(self.uuid), *(self.addr))
BaseConnection.close(self)
for event in self.event_dict.itervalues():
......@@ -213,7 +213,7 @@ class Connection(BaseConnection):
def abort(self):
"""Abort dealing with this connection."""
logging.debug('aborting a connector for %s (%s:%d)',
logging.debug('aborting a connector for %s (%s:%d)',
dump(self.uuid), *(self.addr))
self.aborted = True
......@@ -307,14 +307,14 @@ class Connection(BaseConnection):
self.handler.connectionClosed(self)
return
self.read_buf += data
except ConnectorTryAgainException:
except ConnectorTryAgainException:
pass
except ConnectorConnectionRefusedException:
# should only occur while connecting
self.close()
self.handler.connectionFailed(self)
except ConnectorConnectionClosedException:
# connection resetted by peer, according to the man, this error
# connection resetted by peer, according to the man, this error
# should not occurs but it seems it's false
logging.debug('Connection reset by peer: %r', self.connector)
self.close()
......@@ -350,7 +350,7 @@ class Connection(BaseConnection):
# unhandled connector exception
self.close()
self.handler.connectionClosed(self)
raise
raise
def _addPacket(self, packet):
"""Add a packet into the write buffer."""
......@@ -405,8 +405,8 @@ class Connection(BaseConnection):
@not_closed
def ask(self, packet, timeout=5, additional_timeout=30):
"""
Send a packet with a new ID and register the expectation of an answer
"""
Send a packet with a new ID and register the expectation of an answer
"""
msg_id = self._getNextId()
packet.setId(msg_id)
......
#
# Copyright (C) 2009 Nexedi SA
#
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
......@@ -52,13 +52,13 @@ class SocketConnector:
self.remote_addr = accepted_from
self.is_listening = False
self.is_closed = False
if s is None:
if s is None: