Commit f32e9120 authored by Julien Muchembled's avatar Julien Muchembled

Fix a few errors found by 'pylint -E'

parent baddd320
...@@ -242,7 +242,6 @@ class BaseConnection(object): ...@@ -242,7 +242,6 @@ class BaseConnection(object):
""" """
KEEP_ALIVE = 60 KEEP_ALIVE = 60
_base_timeout = None
def __init__(self, event_manager, handler, connector, addr=None): def __init__(self, event_manager, handler, connector, addr=None):
assert connector is not None, "Need a low-level connector" assert connector is not None, "Need a low-level connector"
...@@ -261,28 +260,8 @@ class BaseConnection(object): ...@@ -261,28 +260,8 @@ class BaseConnection(object):
def cancelRequests(self, *args, **kw): def cancelRequests(self, *args, **kw):
return self._handlers.cancelRequests(self, *args, **kw) return self._handlers.cancelRequests(self, *args, **kw)
def updateTimeout(self, t=None):
if not self._queue:
if t:
self._base_timeout = t
self._timeout = self._handlers.getNextTimeout() or self.KEEP_ALIVE
def checkTimeout(self, t): def checkTimeout(self, t):
# first make sure we don't timeout on answers we already received pass
if self._base_timeout and not self._queue:
timeout = t - self._base_timeout
if self._timeout <= timeout:
handlers = self._handlers
if handlers.isPending():
msg_id = handlers.timeout(self)
if msg_id is None:
self._base_timeout = t
else:
logging.info('timeout for #0x%08x with %r',
msg_id, self)
self.close()
else:
self.idle()
def lock(self): def lock(self):
return 1 return 1
...@@ -407,6 +386,7 @@ class Connection(BaseConnection): ...@@ -407,6 +386,7 @@ class Connection(BaseConnection):
client = False client = False
server = False server = False
peer_id = None peer_id = None
_base_timeout = None
def __init__(self, event_manager, *args, **kw): def __init__(self, event_manager, *args, **kw):
BaseConnection.__init__(self, event_manager, *args, **kw) BaseConnection.__init__(self, event_manager, *args, **kw)
...@@ -472,6 +452,29 @@ class Connection(BaseConnection): ...@@ -472,6 +452,29 @@ class Connection(BaseConnection):
self.cur_id = (next_id + 1) & 0xffffffff self.cur_id = (next_id + 1) & 0xffffffff
return next_id return next_id
def updateTimeout(self, t=None):
if not self._queue:
if t:
self._base_timeout = t
self._timeout = self._handlers.getNextTimeout() or self.KEEP_ALIVE
def checkTimeout(self, t):
# first make sure we don't timeout on answers we already received
if self._base_timeout and not self._queue:
timeout = t - self._base_timeout
if self._timeout <= timeout:
handlers = self._handlers
if handlers.isPending():
msg_id = handlers.timeout(self)
if msg_id is None:
self._base_timeout = t
else:
logging.info('timeout for #0x%08x with %r',
msg_id, self)
self.close()
else:
self.idle()
def abort(self): def abort(self):
"""Abort dealing with this connection.""" """Abort dealing with this connection."""
logging.debug('aborting a connector for %r', self) logging.debug('aborting a connector for %r', self)
......
...@@ -57,7 +57,7 @@ class PacketRecord(object): ...@@ -57,7 +57,7 @@ class PacketRecord(object):
__init__ = property(lambda self: self.__dict__.update) __init__ = property(lambda self: self.__dict__.update)
class logging(Logger): class NEOLogger(Logger):
default_root_handler = StreamHandler() default_root_handler = StreamHandler()
default_root_handler.setFormatter(_Formatter(FMT)) default_root_handler.setFormatter(_Formatter(FMT))
...@@ -224,5 +224,5 @@ class logging(Logger): ...@@ -224,5 +224,5 @@ class logging(Logger):
msg=packet._body)) msg=packet._body))
logging = logging() logging = NEOLogger()
signal.signal(signal.SIGRTMIN, lambda signum, frame: logging.flush()) signal.signal(signal.SIGRTMIN, lambda signum, frame: logging.flush())
...@@ -350,7 +350,7 @@ class NodeManager(object): ...@@ -350,7 +350,7 @@ class NodeManager(object):
if master_db is not None: if master_db is not None:
self._master_db = db = MasterDB(master_db) self._master_db = db = MasterDB(master_db)
for addr in db: for addr in db:
self.createMaster(addr) self.createMaster(address=addr)
close = __init__ close = __init__
......
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
import neo import neo
from neo.lib import logging from neo.lib import logging
from neo.lib.util import dump from neo.lib.util import dump
from neo.lib.protocol import NodeTypes, Packets, ProtocolError, NodeStates, \ from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets, \
ClusterStates NotReadyError, ProtocolError
from . import MasterHandler from . import MasterHandler
class IdentificationHandler(MasterHandler): class IdentificationHandler(MasterHandler):
...@@ -65,7 +65,7 @@ class IdentificationHandler(MasterHandler): ...@@ -65,7 +65,7 @@ class IdentificationHandler(MasterHandler):
handler = app.administration_handler handler = app.administration_handler
human_readable_node_type = 'n admin ' human_readable_node_type = 'n admin '
else: else:
raise NotImplementedError(node_type) assert False, node_type
while not app.isValidUUID(uuid, address): while not app.isValidUUID(uuid, address):
uuid = app.getNewUUID(node_type) uuid = app.getNewUUID(node_type)
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from .neoctl import NeoCTL, NotReadyException from .neoctl import NeoCTL, NotReadyException
from neo.lib.util import bin, dump from neo.lib.util import bin, dump, p64
from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, ZERO_TID from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, ZERO_TID
action_dict = { action_dict = {
......
...@@ -122,7 +122,7 @@ class Application(object): ...@@ -122,7 +122,7 @@ class Application(object):
dm.setName(self.name) dm.setName(self.name)
elif name != self.name: elif name != self.name:
raise RuntimeError('name %r does not match with the database: %r' raise RuntimeError('name %r does not match with the database: %r'
% (self.name, dm_name)) % (self.name, name))
# load configuration # load configuration
self.uuid = dm.getUUID() self.uuid = dm.getUUID()
......
...@@ -96,7 +96,7 @@ class StorageOperationHandler(EventHandler): ...@@ -96,7 +96,7 @@ class StorageOperationHandler(EventHandler):
deleteObject = self.app.dm.deleteObject deleteObject = self.app.dm.deleteObject
for serial, oid_list in object_dict.iteritems(): for serial, oid_list in object_dict.iteritems():
for oid in oid_list: for oid in oid_list:
delObject(oid, serial) deleteObject(oid, serial)
assert not pack_tid, "TODO" assert not pack_tid, "TODO"
if next_tid: if next_tid:
self.app.replicator.fetchObjects(next_tid, next_oid) self.app.replicator.fetchObjects(next_tid, next_oid)
......
...@@ -334,7 +334,7 @@ class TransactionManager(object): ...@@ -334,7 +334,7 @@ class TransactionManager(object):
lock_ttid = self._load_lock_dict.pop(oid, None) lock_ttid = self._load_lock_dict.pop(oid, None)
assert lock_ttid in (ttid, None), 'Transaction %s tried to ' \ assert lock_ttid in (ttid, None), 'Transaction %s tried to ' \
'release the lock on oid %s, but it was held by %s' % ( 'release the lock on oid %s, but it was held by %s' % (
dump(ttid), dump(oid), dump(lock_tid)) dump(ttid), dump(oid), dump(lock_ttid))
write_locking_tid = self._store_lock_dict.pop(oid) write_locking_tid = self._store_lock_dict.pop(oid)
assert write_locking_tid == ttid, 'Inconsistent locking state: ' \ assert write_locking_tid == ttid, 'Inconsistent locking state: ' \
'aborting %s:%s but %s has the lock.' % (dump(ttid), dump(oid), 'aborting %s:%s but %s has the lock.' % (dump(ttid), dump(oid),
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import __builtin__ import __builtin__
import errno
import os import os
import random import random
import socket import socket
...@@ -28,7 +29,7 @@ from mock import Mock ...@@ -28,7 +29,7 @@ from mock import Mock
from neo.lib import debug, logging, protocol from neo.lib import debug, logging, protocol
from neo.lib.protocol import Packets from neo.lib.protocol import Packets
from neo.lib.util import getAddressType from neo.lib.util import getAddressType
from time import time, gmtime from time import time
from struct import pack, unpack from struct import pack, unpack
DB_PREFIX = os.getenv('NEO_DB_PREFIX', 'test_neo') DB_PREFIX = os.getenv('NEO_DB_PREFIX', 'test_neo')
...@@ -217,27 +218,8 @@ class NeoUnitTestBase(NeoTestBase): ...@@ -217,27 +218,8 @@ class NeoUnitTestBase(NeoTestBase):
return self._makeUUID('A') return self._makeUUID('A')
def getNextTID(self, ltid=None): def getNextTID(self, ltid=None):
tm = time() from ZODB.utils import newTid
gmt = gmtime(tm) return newTid(ltid)
upper = ((((gmt.tm_year - 1900) * 12 + gmt.tm_mon - 1) * 31 \
+ gmt.tm_mday - 1) * 24 + gmt.tm_hour) * 60 + gmt.tm_min
lower = int((gmt.tm_sec % 60 + (tm - int(tm))) / (60.0 / 65536.0 / 65536.0))
tid = pack('!LL', upper, lower)
if ltid is not None and tid <= ltid:
upper, lower = unpack('!LL', self._last_tid)
if lower == 0xffffffff:
# This should not happen usually.
from datetime import timedelta, datetime
d = datetime(gmt.tm_year, gmt.tm_mon, gmt.tm_mday,
gmt.tm_hour, gmt.tm_min) \
+ timedelta(0, 60)
upper = ((((d.year - 1900) * 12 + d.month - 1) * 31 \
+ d.day - 1) * 24 + d.hour) * 60 + d.minute
lower = 0
else:
lower += 1
tid = pack('!LL', upper, lower)
return tid
def getPTID(self, i=None): def getPTID(self, i=None):
""" Return an integer PTID """ """ Return an integer PTID """
...@@ -489,18 +471,12 @@ class NeoUnitTestBase(NeoTestBase): ...@@ -489,18 +471,12 @@ class NeoUnitTestBase(NeoTestBase):
def checkAnswerObjectHistory(self, conn, **kw): def checkAnswerObjectHistory(self, conn, **kw):
return self.checkAnswerPacket(conn, Packets.AnswerObjectHistory, **kw) return self.checkAnswerPacket(conn, Packets.AnswerObjectHistory, **kw)
def checkAnswerObjectHistoryFrom(self, conn, **kw):
return self.checkAnswerPacket(conn, Packets.AnswerObjectHistoryFrom, **kw)
def checkAnswerStoreTransaction(self, conn, **kw): def checkAnswerStoreTransaction(self, conn, **kw):
return self.checkAnswerPacket(conn, Packets.AnswerStoreTransaction, **kw) return self.checkAnswerPacket(conn, Packets.AnswerStoreTransaction, **kw)
def checkAnswerStoreObject(self, conn, **kw): def checkAnswerStoreObject(self, conn, **kw):
return self.checkAnswerPacket(conn, Packets.AnswerStoreObject, **kw) return self.checkAnswerPacket(conn, Packets.AnswerStoreObject, **kw)
def checkAnswerOids(self, conn, **kw):
return self.checkAnswerPacket(conn, Packets.AnswerOIDs, **kw)
def checkAnswerPartitionTable(self, conn, **kw): def checkAnswerPartitionTable(self, conn, **kw):
return self.checkAnswerPacket(conn, Packets.AnswerPartitionTable, **kw) return self.checkAnswerPacket(conn, Packets.AnswerPartitionTable, **kw)
......
...@@ -5,8 +5,8 @@ import smtplib ...@@ -5,8 +5,8 @@ import smtplib
import optparse import optparse
import platform import platform
import datetime import datetime
from email.MIMEMultipart import MIMEMultipart from email.mime.multipart import MIMEMultipart
from email.MIMEText import MIMEText from email.mime.text import MIMEText
MAIL_SERVER = '127.0.0.1:25' MAIL_SERVER = '127.0.0.1:25'
......
...@@ -32,7 +32,7 @@ MARKER = [] ...@@ -32,7 +32,7 @@ MARKER = []
class StorageBootstrapHandlerTests(NeoUnitTestBase): class StorageBootstrapHandlerTests(NeoUnitTestBase):
def setUp(self): def setUp(self):
super(NeoUnitTestBase, self).setUp() super(StorageBootstrapHandlerTests, self).setUp()
self.app = Mock() self.app = Mock()
self.app.nm = NodeManager() self.app.nm = NodeManager()
self.handler = StorageBootstrapHandler(self.app) self.handler = StorageBootstrapHandler(self.app)
......
...@@ -22,7 +22,6 @@ from persistent import Persistent ...@@ -22,7 +22,6 @@ from persistent import Persistent
from . import NEOCluster, NEOFunctionalTest from . import NEOCluster, NEOFunctionalTest
from neo.lib.protocol import ClusterStates, NodeStates from neo.lib.protocol import ClusterStates, NodeStates
from ZODB.tests.StorageTestBase import zodb_pickle from ZODB.tests.StorageTestBase import zodb_pickle
import MySQLdb, sqlite3
from MySQLdb.constants.ER import NO_SUCH_TABLE from MySQLdb.constants.ER import NO_SUCH_TABLE
class PObject(Persistent): class PObject(Persistent):
...@@ -121,25 +120,6 @@ class StorageTests(NEOFunctionalTest): ...@@ -121,25 +120,6 @@ class StorageTests(NEOFunctionalTest):
storage_list = self.neo.getStorageList(NodeStates.RUNNING) storage_list = self.neo.getStorageList(NodeStates.RUNNING)
self.assertEqual(len(storage_list), 2) self.assertEqual(len(storage_list), 2)
def __checkReplicateCount(self, db_name, target_count, timeout=0, delay=1):
db = self.neo.getSQLConnection(db_name, autocommit=True)
def callback(last_try):
replicate_count = 0
try:
replicate_count = self.queryCount(db,
'select count(distinct uuid) from pt')
except MySQLdb.ProgrammingError, e:
if e[0] != NO_SUCH_TABLE:
raise
except sqlite3.OperationalError, e:
if not e[0].startswith('no such table:'):
raise
if last_try is not None and last_try < replicate_count:
raise AssertionError, 'Regression: %s became %s' % \
(last_try, replicate_count)
return replicate_count == target_count, replicate_count
self.neo.expectCondition(callback, timeout, delay)
def testNewNodesInPendingState(self): def testNewNodesInPendingState(self):
""" Check that new storage nodes are set as pending, the cluster remains """ Check that new storage nodes are set as pending, the cluster remains
running """ running """
......
...@@ -49,9 +49,6 @@ class MasterClientHandlerTests(NeoUnitTestBase): ...@@ -49,9 +49,6 @@ class MasterClientHandlerTests(NeoUnitTestBase):
address=self.storage_address, address=self.storage_address,
) )
def getLastUUID(self):
return self.uuid
def identifyToMasterNode(self, node_type=NodeTypes.STORAGE, ip="127.0.0.1", def identifyToMasterNode(self, node_type=NodeTypes.STORAGE, ip="127.0.0.1",
port=10021): port=10021):
"""Do first step of identification to MN """ """Do first step of identification to MN """
......
...@@ -44,9 +44,6 @@ class MasterRecoveryTests(NeoUnitTestBase): ...@@ -44,9 +44,6 @@ class MasterRecoveryTests(NeoUnitTestBase):
self.storage_address = ('127.0.0.1', self.storage_port) self.storage_address = ('127.0.0.1', self.storage_port)
# Common methods # Common methods
def getLastUUID(self):
return self.uuid
def identifyToMasterNode(self, node_type=NodeTypes.STORAGE, ip="127.0.0.1", def identifyToMasterNode(self, node_type=NodeTypes.STORAGE, ip="127.0.0.1",
port=10021): port=10021):
"""Do first step of identification to MN """Do first step of identification to MN
......
...@@ -55,9 +55,6 @@ class MasterStorageHandlerTests(NeoUnitTestBase): ...@@ -55,9 +55,6 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
return self.identifyToMasterNode(node_type=NodeTypes.STORAGE, return self.identifyToMasterNode(node_type=NodeTypes.STORAGE,
ip='127.0.0.1', port=self._allocatePort()) ip='127.0.0.1', port=self._allocatePort())
def getLastUUID(self):
return self.uuid
def identifyToMasterNode(self, node_type=NodeTypes.STORAGE, ip="127.0.0.1", def identifyToMasterNode(self, node_type=NodeTypes.STORAGE, ip="127.0.0.1",
port=10021): port=10021):
"""Do first step of identification to MN """Do first step of identification to MN
......
...@@ -45,9 +45,6 @@ class MasterVerificationTests(NeoUnitTestBase): ...@@ -45,9 +45,6 @@ class MasterVerificationTests(NeoUnitTestBase):
self.storage_address = ('127.0.0.1', self.storage_port) self.storage_address = ('127.0.0.1', self.storage_port)
# Common methods # Common methods
def getLastUUID(self):
return self.uuid
def identifyToMasterNode(self, node_type=NodeTypes.STORAGE, ip="127.0.0.1", def identifyToMasterNode(self, node_type=NodeTypes.STORAGE, ip="127.0.0.1",
port=10021): port=10021):
"""Do first step of identification to MN """Do first step of identification to MN
......
...@@ -47,10 +47,6 @@ class StorageInitializationHandlerTests(NeoUnitTestBase): ...@@ -47,10 +47,6 @@ class StorageInitializationHandlerTests(NeoUnitTestBase):
del self.app del self.app
super(StorageInitializationHandlerTests, self)._tearDown(success) super(StorageInitializationHandlerTests, self)._tearDown(success)
# Common methods
def getLastUUID(self):
return self.uuid
def getClientConnection(self): def getClientConnection(self):
address = ("127.0.0.1", self.client_port) address = ("127.0.0.1", self.client_port)
return self.getFakeConnection(uuid=self.getNewUUID(), address=address) return self.getFakeConnection(uuid=self.getNewUUID(), address=address)
......
...@@ -27,12 +27,6 @@ from neo.lib.protocol import INVALID_TID, INVALID_OID ...@@ -27,12 +27,6 @@ from neo.lib.protocol import INVALID_TID, INVALID_OID
class StorageMasterHandlerTests(NeoUnitTestBase): class StorageMasterHandlerTests(NeoUnitTestBase):
def checkHandleUnexpectedPacket(self, _call, _msg_type, _listening=True, **kwargs):
conn = self.getMasterConnection(is_server=_listening)
# hook
self.operation.peerBroken = lambda c: c.peerBrokendCalled()
self.checkUnexpectedPacketRaised(_call, conn=conn, **kwargs)
def setUp(self): def setUp(self):
NeoUnitTestBase.setUp(self) NeoUnitTestBase.setUp(self)
self.prepareDatabase(number=1) self.prepareDatabase(number=1)
......
...@@ -43,7 +43,7 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -43,7 +43,7 @@ class StorageDBTests(NeoUnitTestBase):
pass pass
NeoUnitTestBase._tearDown(self, success) NeoUnitTestBase._tearDown(self, success)
def getDB(self): def getDB(self, reset=0):
raise NotImplementedError raise NotImplementedError
def setNumPartitions(self, num_partitions, reset=0): def setNumPartitions(self, num_partitions, reset=0):
......
...@@ -49,9 +49,6 @@ class StorageVerificationHandlerTests(NeoUnitTestBase): ...@@ -49,9 +49,6 @@ class StorageVerificationHandlerTests(NeoUnitTestBase):
super(StorageVerificationHandlerTests, self)._tearDown(success) super(StorageVerificationHandlerTests, self)._tearDown(success)
# Common methods # Common methods
def getLastUUID(self):
return self.uuid
def getClientConnection(self): def getClientConnection(self):
address = ("127.0.0.1", self.client_port) address = ("127.0.0.1", self.client_port)
return self.getFakeConnection(uuid=self.getNewUUID(), address=address) return self.getFakeConnection(uuid=self.getNewUUID(), address=address)
......
...@@ -41,10 +41,6 @@ class BootstrapManagerTests(NeoUnitTestBase): ...@@ -41,10 +41,6 @@ class BootstrapManagerTests(NeoUnitTestBase):
del self.app del self.app
super(BootstrapManagerTests, self)._tearDown(success) super(BootstrapManagerTests, self)._tearDown(success)
# Common methods
def getLastUUID(self):
return self.uuid
# Tests # Tests
def testConnectionCompleted(self): def testConnectionCompleted(self):
address=("127.0.0.1", self.master_port) address=("127.0.0.1", self.master_port)
......
...@@ -330,11 +330,9 @@ class MasterDBTests(NeoUnitTestBase): ...@@ -330,11 +330,9 @@ class MasterDBTests(NeoUnitTestBase):
""" """
temp_dir = getTempDirectory() temp_dir = getTempDirectory()
directory = join(temp_dir, 'read_only') directory = join(temp_dir, 'read_only')
assert not exists(directory), db_file
db_file = join(directory, 'not_created') db_file = join(directory, 'not_created')
mkdir(directory) mkdir(directory, 0400)
try: try:
chmod(directory, 0400)
self.assertRaises(IOError, MasterDB, db_file) self.assertRaises(IOError, MasterDB, db_file)
finally: finally:
rmdir(directory) rmdir(directory)
...@@ -345,7 +343,6 @@ class MasterDBTests(NeoUnitTestBase): ...@@ -345,7 +343,6 @@ class MasterDBTests(NeoUnitTestBase):
""" """
temp_dir = getTempDirectory() temp_dir = getTempDirectory()
directory = join(temp_dir, 'read_write') directory = join(temp_dir, 'read_write')
assert not exists(directory), db_file
db_file = join(directory, 'db') db_file = join(directory, 'db')
mkdir(directory) mkdir(directory)
try: try:
...@@ -370,7 +367,6 @@ class MasterDBTests(NeoUnitTestBase): ...@@ -370,7 +367,6 @@ class MasterDBTests(NeoUnitTestBase):
def testPersistence(self): def testPersistence(self):
temp_dir = getTempDirectory() temp_dir = getTempDirectory()
directory = join(temp_dir, 'read_write') directory = join(temp_dir, 'read_write')
assert not exists(directory), db_file
db_file = join(directory, 'db') db_file = join(directory, 'db')
mkdir(directory) mkdir(directory)
try: try:
......
...@@ -467,7 +467,6 @@ class NEOCluster(object): ...@@ -467,7 +467,6 @@ class NEOCluster(object):
SocketConnector.makeListeningConnection) SocketConnector.makeListeningConnection)
SocketConnector_receive = staticmethod(SocketConnector.receive) SocketConnector_receive = staticmethod(SocketConnector.receive)
SocketConnector_send = staticmethod(SocketConnector.send) SocketConnector_send = staticmethod(SocketConnector.send)
Storage__init__ = staticmethod(Storage.__init__)
_patch_count = 0 _patch_count = 0
_resource_dict = weakref.WeakValueDictionary() _resource_dict = weakref.WeakValueDictionary()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment