Commit 8a9fed79 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Wrap lines to fit in 80 columns.


git-svn-id: https://svn.erp5.org/repos/neo/trunk@1365 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 007ab1e5
...@@ -27,12 +27,14 @@ class AdminEventHandler(EventHandler): ...@@ -27,12 +27,14 @@ class AdminEventHandler(EventHandler):
"""This class deals with events for administrating cluster.""" """This class deals with events for administrating cluster."""
def askPartitionList(self, conn, packet, min_offset, max_offset, uuid): def askPartitionList(self, conn, packet, min_offset, max_offset, uuid):
logging.info("ask partition list from %s to %s for %s" %(min_offset, max_offset, dump(uuid))) logging.info("ask partition list from %s to %s for %s" %
(min_offset, max_offset, dump(uuid)))
app = self.app app = self.app
# check we have one pt otherwise ask it to PMN # check we have one pt otherwise ask it to PMN
if app.pt is None: if app.pt is None:
if self.app.master_conn is None: if self.app.master_conn is None:
raise protocol.NotReadyError('Not connected to a primary master.') raise protocol.NotReadyError('Not connected to a primary ' \
'master.')
p = Packets.AskPartitionTable([]) p = Packets.AskPartitionTable([])
msg_id = self.app.master_conn.ask(p) msg_id = self.app.master_conn.ask(p)
app.dispatcher.register(msg_id, conn, app.dispatcher.register(msg_id, conn,
...@@ -41,7 +43,8 @@ class AdminEventHandler(EventHandler): ...@@ -41,7 +43,8 @@ class AdminEventHandler(EventHandler):
'uuid' : uuid, 'uuid' : uuid,
'msg_id' : packet.getId()}) 'msg_id' : packet.getId()})
else: else:
app.sendPartitionTable(conn, min_offset, max_offset, uuid, packet.getId()) app.sendPartitionTable(conn, min_offset, max_offset, uuid,
packet.getId())
def askNodeList(self, conn, packet, node_type): def askNodeList(self, conn, packet, node_type):
...@@ -89,10 +92,12 @@ class AdminEventHandler(EventHandler): ...@@ -89,10 +92,12 @@ class AdminEventHandler(EventHandler):
def askClusterState(self, conn, packet): def askClusterState(self, conn, packet):
if self.app.cluster_state is None: if self.app.cluster_state is None:
if self.app.master_conn is None: if self.app.master_conn is None:
raise protocol.NotReadyError('Not connected to a primary master.') raise protocol.NotReadyError('Not connected to a primary ' \
'master.')
# required it from PMN first # required it from PMN first
msg_id = self.app.master_conn.ask(Packets.AskClusterState()) msg_id = self.app.master_conn.ask(Packets.AskClusterState())
self.app.dispatcher.register(msg_id, conn, {'msg_id' : packet.getId()}) self.app.dispatcher.register(msg_id, conn,
{'msg_id' : packet.getId()})
else: else:
conn.answer(Packets.AnswerClusterState(self.app.cluster_state), conn.answer(Packets.AnswerClusterState(self.app.cluster_state),
packet.getId()) packet.getId())
......
...@@ -18,7 +18,8 @@ ...@@ -18,7 +18,8 @@
from ZODB import BaseStorage, ConflictResolution, POSException from ZODB import BaseStorage, ConflictResolution, POSException
from neo.client.app import Application from neo.client.app import Application
from neo.client.exception import NEOStorageConflictError, NEOStorageNotFoundError from neo.client.exception import NEOStorageConflictError, \
NEOStorageNotFoundError
class Storage(BaseStorage.BaseStorage, class Storage(BaseStorage.BaseStorage,
ConflictResolution.ConflictResolvingStorage): ConflictResolution.ConflictResolvingStorage):
...@@ -62,7 +63,8 @@ class Storage(BaseStorage.BaseStorage, ...@@ -62,7 +63,8 @@ class Storage(BaseStorage.BaseStorage,
def tpc_begin(self, transaction, tid=None, status=' '): def tpc_begin(self, transaction, tid=None, status=' '):
if self._is_read_only: if self._is_read_only:
raise POSException.ReadOnlyError() raise POSException.ReadOnlyError()
return self.app.tpc_begin(transaction=transaction, tid=tid, status=status) return self.app.tpc_begin(transaction=transaction, tid=tid,
status=status)
def tpc_vote(self, transaction): def tpc_vote(self, transaction):
if self._is_read_only: if self._is_read_only:
......
This diff is collapsed.
...@@ -21,7 +21,7 @@ class NeoStorage(BaseConfig): ...@@ -21,7 +21,7 @@ class NeoStorage(BaseConfig):
def open(self): def open(self):
from Storage import Storage from Storage import Storage
return Storage(master_nodes = self.config.master_nodes, name = self.config.name, return Storage(master_nodes=self.config.master_nodes,
connector = self.config.connector) name=self.config.name, connector = self.config.connector)
...@@ -109,14 +109,16 @@ class PrimaryNotificationsHandler(BaseHandler): ...@@ -109,14 +109,16 @@ class PrimaryNotificationsHandler(BaseHandler):
app.master_conn = None app.master_conn = None
app.primary_master_node = None app.primary_master_node = None
else: else:
logging.warn('app.master_conn is %s, but we are closing %s', app.master_conn, conn) logging.warn('app.master_conn is %s, but we are closing %s',
app.master_conn, conn)
super(PrimaryNotificationsHandler, self).connectionClosed(conn) super(PrimaryNotificationsHandler, self).connectionClosed(conn)
def timeoutExpired(self, conn): def timeoutExpired(self, conn):
app = self.app app = self.app
if app.master_conn is not None: if app.master_conn is not None:
assert conn is app.master_conn assert conn is app.master_conn
logging.critical("connection timeout to primary master node expired") logging.critical("connection timeout to primary master node ' \
'expired")
BaseHandler.timeoutExpired(self, conn) BaseHandler.timeoutExpired(self, conn)
def peerBroken(self, conn): def peerBroken(self, conn):
......
...@@ -78,8 +78,8 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -78,8 +78,8 @@ class StorageAnswersHandler(AnswerBaseHandler):
def answerObject(self, conn, packet, oid, start_serial, end_serial, def answerObject(self, conn, packet, oid, start_serial, end_serial,
compression, checksum, data): compression, checksum, data):
app = self.app app = self.app
app.local_var.asked_object = (oid, start_serial, end_serial, compression, app.local_var.asked_object = (oid, start_serial, end_serial,
checksum, data) compression, checksum, data)
def answerStoreObject(self, conn, packet, conflicting, oid, serial): def answerStoreObject(self, conn, packet, conflicting, oid, serial):
app = self.app app = self.app
......
...@@ -131,7 +131,8 @@ class MQ(object): ...@@ -131,7 +131,8 @@ class MQ(object):
- The size calculation is not accurate. - The size calculation is not accurate.
""" """
def __init__(self, life_time=10000, buffer_levels=9, max_history_size=100000, max_size=20*1024*1024): def __init__(self, life_time=10000, buffer_levels=9,
max_history_size=100000, max_size=20*1024*1024):
self._history_buffer = FIFO() self._history_buffer = FIFO()
self._cache_buffers = [] self._cache_buffers = []
for level in range(buffer_levels): for level in range(buffer_levels):
...@@ -203,7 +204,8 @@ class MQ(object): ...@@ -203,7 +204,8 @@ class MQ(object):
except KeyError: except KeyError:
counter = 1 counter = 1
# XXX It might be better to adjust the level according to the object size. # XXX It might be better to adjust the level according to the object
# size.
level = min(int(log(counter, 2)), self._buffer_levels - 1) level = min(int(log(counter, 2)), self._buffer_levels - 1)
element = cache_buffers[level].append() element = cache_buffers[level].append()
data = Data() data = Data()
......
...@@ -50,7 +50,9 @@ def lockCheckWrapper(func): ...@@ -50,7 +50,9 @@ def lockCheckWrapper(func):
def wrapper(self, *args, **kw): def wrapper(self, *args, **kw):
if not self._lock._is_owned(): if not self._lock._is_owned():
import traceback import traceback
logging.warning('%s called on %s instance without being locked. Stack:\n%s', func.func_code.co_name, self.__class__.__name__, ''.join(traceback.format_stack())) logging.warning('%s called on %s instance without being locked.' \
' Stack:\n%s', func.func_code.co_name, self.__class__.__name__,
''.join(traceback.format_stack()))
# Call anyway # Call anyway
return func(self, *args, **kw) return func(self, *args, **kw)
return wrapper return wrapper
...@@ -366,7 +368,7 @@ class Connection(BaseConnection): ...@@ -366,7 +368,7 @@ class Connection(BaseConnection):
if self.write_buf: if self.write_buf:
self.em.addWriter(self) self.em.addWriter(self)
def expectMessage(self, msg_id = None, timeout = 5, additional_timeout = 30): def expectMessage(self, msg_id=None, timeout=5, additional_timeout=30):
"""Expect a message for a reply to a given message ID or any message. """Expect a message for a reply to a given message ID or any message.
The purpose of this method is to define how much amount of time is The purpose of this method is to define how much amount of time is
...@@ -403,7 +405,9 @@ class Connection(BaseConnection): ...@@ -403,7 +405,9 @@ class Connection(BaseConnection):
@not_closed @not_closed
def ask(self, packet, timeout=5, additional_timeout=30): def ask(self, packet, timeout=5, additional_timeout=30):
""" Send a packet with a new ID and register the expectation of an answer """ """
Send a packet with a new ID and register the expectation of an answer
"""
msg_id = self._getNextId() msg_id = self._getNextId()
packet.setId(msg_id) packet.setId(msg_id)
self.expectMessage(msg_id) self.expectMessage(msg_id)
......
...@@ -70,8 +70,8 @@ class SocketConnector: ...@@ -70,8 +70,8 @@ class SocketConnector:
raise ConnectorInProgressException raise ConnectorInProgressException
if err == errno.ECONNREFUSED: if err == errno.ECONNREFUSED:
raise ConnectorConnectionRefusedException raise ConnectorConnectionRefusedException
raise ConnectorException, 'makeClientConnection to %s failed: ' \ raise ConnectorException, 'makeClientConnection to %s failed:' \
'%s:%s' % (addr, err, errmsg) ' %s:%s' % (addr, err, errmsg)
finally: finally:
logging.debug('%r connecting to %r', self.socket.getsockname(), logging.debug('%r connecting to %r', self.socket.getsockname(),
addr) addr)
...@@ -85,15 +85,16 @@ class SocketConnector: ...@@ -85,15 +85,16 @@ class SocketConnector:
self.socket.listen(5) self.socket.listen(5)
except socket.error, (err, errmsg): except socket.error, (err, errmsg):
self.socket.close() self.socket.close()
raise ConnectorException, 'makeListeningConnection on %s failed: %s:%s' % \ raise ConnectorException, 'makeListeningConnection on %s failed:' \
(addr, err, errmsg) ' %s:%s' % (addr, err, errmsg)
def getError(self): def getError(self):
return self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) return self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
def getDescriptor(self): def getDescriptor(self):
# this descriptor must only be used by the event manager, where it guarantee # this descriptor must only be used by the event manager, where it
# unicity only while the connector is opened and registered in epoll # guarantee unicity only while the connector is opened and registered
# in epoll
return self.socket.fileno() return self.socket.fileno()
def getNewConnection(self): def getNewConnection(self):
......
...@@ -22,8 +22,10 @@ from time import time ...@@ -22,8 +22,10 @@ from time import time
from neo.epoll import Epoll from neo.epoll import Epoll
class IdleEvent(object): class IdleEvent(object):
"""This class represents an event called when a connection is waiting for """
a message too long.""" This class represents an event called when a connection is waiting for
a message too long.
"""
def __init__(self, conn, msg_id, timeout, additional_timeout): def __init__(self, conn, msg_id, timeout, additional_timeout):
self._conn = conn self._conn = conn
...@@ -141,8 +143,8 @@ class SelectEventManager(object): ...@@ -141,8 +143,8 @@ class SelectEventManager(object):
self._addPendingConnection(to_process) self._addPendingConnection(to_process)
def _poll(self, timeout = 1): def _poll(self, timeout = 1):
rlist, wlist, xlist = select(self.reader_set, self.writer_set, self.exc_list, rlist, wlist, xlist = select(self.reader_set, self.writer_set,
timeout) self.exc_list, timeout)
for s in rlist: for s in rlist:
conn = self.connection_dict[s] conn = self.connection_dict[s]
conn.lock() conn.lock()
......
...@@ -37,7 +37,8 @@ class EventHandler(object): ...@@ -37,7 +37,8 @@ class EventHandler(object):
# if decoding fail, there's no packet instance # if decoding fail, there's no packet instance
logging.error('malformed packet from %s:%d: %s', *args) logging.error('malformed packet from %s:%d: %s', *args)
else: else:
logging.error('malformed packet %s from %s:%d: %s', packet.getType(), *args) logging.error('malformed packet %s from %s:%d: %s',
packet.getType(), *args)
response = protocol.protocolError(message) response = protocol.protocolError(message)
if packet is not None: if packet is not None:
conn.answer(response, packet.getId()) conn.answer(response, packet.getId())
...@@ -386,7 +387,8 @@ class EventHandler(object): ...@@ -386,7 +387,8 @@ class EventHandler(object):
d[Packets.StartOperation] = self.startOperation d[Packets.StartOperation] = self.startOperation
d[Packets.StopOperation] = self.stopOperation d[Packets.StopOperation] = self.stopOperation
d[Packets.AskUnfinishedTransactions] = self.askUnfinishedTransactions d[Packets.AskUnfinishedTransactions] = self.askUnfinishedTransactions
d[Packets.AnswerUnfinishedTransactions] = self.answerUnfinishedTransactions d[Packets.AnswerUnfinishedTransactions] = \
self.answerUnfinishedTransactions
d[Packets.AskObjectPresent] = self.askObjectPresent d[Packets.AskObjectPresent] = self.askObjectPresent
d[Packets.AnswerObjectPresent] = self.answerObjectPresent d[Packets.AnswerObjectPresent] = self.answerObjectPresent
d[Packets.DeleteTransaction] = self.deleteTransaction d[Packets.DeleteTransaction] = self.deleteTransaction
...@@ -411,7 +413,8 @@ class EventHandler(object): ...@@ -411,7 +413,8 @@ class EventHandler(object):
d[Packets.AskTIDs] = self.askTIDs d[Packets.AskTIDs] = self.askTIDs
d[Packets.AnswerTIDs] = self.answerTIDs d[Packets.AnswerTIDs] = self.answerTIDs
d[Packets.AskTransactionInformation] = self.askTransactionInformation d[Packets.AskTransactionInformation] = self.askTransactionInformation
d[Packets.AnswerTransactionInformation] = self.answerTransactionInformation d[Packets.AnswerTransactionInformation] = \
self.answerTransactionInformation
d[Packets.AskObjectHistory] = self.askObjectHistory d[Packets.AskObjectHistory] = self.askObjectHistory
d[Packets.AnswerObjectHistory] = self.answerObjectHistory d[Packets.AnswerObjectHistory] = self.answerObjectHistory
d[Packets.AskOIDs] = self.askOIDs d[Packets.AskOIDs] = self.askOIDs
......
...@@ -48,7 +48,8 @@ class LockUser(object): ...@@ -48,7 +48,8 @@ class LockUser(object):
return isinstance(other, self.__class__) and self.ident == other.ident return isinstance(other, self.__class__) and self.ident == other.ident
def __repr__(self): def __repr__(self):
return '%s@%s:%s %s' % (self.ident, self.caller[0], self.caller[1], self.caller[3]) return '%s@%s:%s %s' % (self.ident, self.caller[0], self.caller[1],
self.caller[3])
def formatStack(self): def formatStack(self):
return ''.join(traceback.format_list(self.stack)) return ''.join(traceback.format_list(self.stack))
...@@ -59,7 +60,8 @@ class VerboseLockBase(object): ...@@ -59,7 +60,8 @@ class VerboseLockBase(object):
self.debug_lock = debug_lock self.debug_lock = debug_lock
self.owner = None self.owner = None
self.waiting = [] self.waiting = []
self._note('%s@%X created by %r', self.__class__.__name__, id(self), LockUser(1)) self._note('%s@%X created by %r', self.__class__.__name__, id(self),
LockUser(1))
def _note(self, fmt, *args): def _note(self, fmt, *args):
sys.stderr.write(fmt % args + '\n') sys.stderr.write(fmt % args + '\n')
...@@ -75,12 +77,16 @@ class VerboseLockBase(object): ...@@ -75,12 +77,16 @@ class VerboseLockBase(object):
def acquire(self, blocking=1): def acquire(self, blocking=1):
me = LockUser() me = LockUser()
owner = self._getOwner() owner = self._getOwner()
self._note('[%r]%s.acquire(%s) Waiting for lock. Owned by:%r Waiting:%r', me, self, blocking, owner, self.waiting) self._note('[%r]%s.acquire(%s) Waiting for lock. Owned by:%r ' \
if (self.debug_lock and owner is not None) or (not self.reentrant and blocking and me == owner): 'Waiting:%r', me, self, blocking, owner, self.waiting)
if (self.debug_lock and owner is not None) or \
(not self.reentrant and blocking and me == owner):
if me == owner: if me == owner:
self._note('[%r]%s.acquire(%s): Deadlock detected: I already own this lock:%r', me, self, blocking, owner) self._note('[%r]%s.acquire(%s): Deadlock detected: ' \
' I already own this lock:%r', me, self, blocking, owner)
else: else:
self._note('[%r]%s.acquire(%s): debug lock triggered: %r', me, self, blocking, owner) self._note('[%r]%s.acquire(%s): debug lock triggered: %r',
me, self, blocking, owner)
self._note('Owner traceback:\n%s', owner.formatStack()) self._note('Owner traceback:\n%s', owner.formatStack())
self._note('My traceback:\n%s', me.formatStack()) self._note('My traceback:\n%s', me.formatStack())
self.waiting.append(me) self.waiting.append(me)
...@@ -89,7 +95,8 @@ class VerboseLockBase(object): ...@@ -89,7 +95,8 @@ class VerboseLockBase(object):
finally: finally:
self.owner = me self.owner = me
self.waiting.remove(me) self.waiting.remove(me)
self._note('[%r]%s.acquire(%s) Lock granted. Waiting: %r', me, self, blocking, self.waiting) self._note('[%r]%s.acquire(%s) Lock granted. Waiting: %r',
me, self, blocking, self.waiting)
def release(self): def release(self):
me = LockUser() me = LockUser()
...@@ -104,7 +111,8 @@ class VerboseLockBase(object): ...@@ -104,7 +111,8 @@ class VerboseLockBase(object):
class VerboseRLock(VerboseLockBase): class VerboseRLock(VerboseLockBase):
def __init__(self, verbose=None, debug_lock=False): def __init__(self, verbose=None, debug_lock=False):
super(VerboseRLock, self).__init__(reentrant=True, debug_lock=debug_lock) super(VerboseRLock, self).__init__(reentrant=True,
debug_lock=debug_lock)
self.lock = threading_RLock() self.lock = threading_RLock()
def _locked(self): def _locked(self):
......
...@@ -26,8 +26,8 @@ from neo.protocol import ClusterStates, NodeStates, NodeTypes, Packets ...@@ -26,8 +26,8 @@ from neo.protocol import ClusterStates, NodeStates, NodeTypes, Packets
from neo.node import NodeManager from neo.node import NodeManager
from neo.event import EventManager from neo.event import EventManager
from neo.connection import ListeningConnection, ClientConnection from neo.connection import ListeningConnection, ClientConnection
from neo.exception import ElectionFailure, PrimaryFailure, VerificationFailure, \ from neo.exception import ElectionFailure, PrimaryFailure, \
OperationFailure VerificationFailure, OperationFailure
from neo.master.handlers import election, identification, secondary, recovery from neo.master.handlers import election, identification, secondary, recovery
from neo.master.handlers import verification, storage, client, shutdown from neo.master.handlers import verification, storage, client, shutdown
from neo.master.handlers import administration from neo.master.handlers import administration
...@@ -63,8 +63,9 @@ class Application(object): ...@@ -63,8 +63,9 @@ class Application(object):
if partitions <= 0: if partitions <= 0:
raise RuntimeError, 'partitions must be more than zero' raise RuntimeError, 'partitions must be more than zero'
self.pt = PartitionTable(partitions, replicas) self.pt = PartitionTable(partitions, replicas)
logging.debug('the number of replicas is %d, the number of partitions is %d, the name is %s', logging.debug('the number of replicas is %d, the number of ' \
replicas, partitions, self.name) 'partitions is %d, the name is %s',
replicas, partitions, self.name)
self.listening_conn = None self.listening_conn = None
self.primary = None self.primary = None
...@@ -173,16 +174,18 @@ class Application(object): ...@@ -173,16 +174,18 @@ class Application(object):
t = current_time t = current_time
for node in nm.getMasterList(): for node in nm.getMasterList():
if node.isTemporarilyDown() \ if node.isTemporarilyDown() \
and node.getLastStateChange() + expiration < current_time: and node.getLastStateChange() + \
expiration < current_time:
logging.info('%s is down' % (node, )) logging.info('%s is down' % (node, ))
node.setDown() node.setDown()
self.unconnected_master_node_set.discard(node.getAddress()) self.unconnected_master_node_set.discard(
node.getAddress())
# Try to connect to master nodes. # Try to connect to master nodes.
if self.unconnected_master_node_set: if self.unconnected_master_node_set:
for addr in list(self.unconnected_master_node_set): for addr in list(self.unconnected_master_node_set):
ClientConnection(em, client_handler, addr = addr, ClientConnection(em, client_handler, addr=addr,
connector_handler = self.connector_handler) connector_handler=self.connector_handler)
em.poll(1) em.poll(1)
if len(self.unconnected_master_node_set) == 0 \ if len(self.unconnected_master_node_set) == 0 \
and len(self.negotiating_master_node_set) == 0: and len(self.negotiating_master_node_set) == 0:
...@@ -195,7 +198,7 @@ class Application(object): ...@@ -195,7 +198,7 @@ class Application(object):
if self.primary is None: if self.primary is None:
# I am the primary. # I am the primary.
self.primary = True self.primary = True
logging.debug('I am the primary, so sending an announcement') logging.debug('I am the primary, sending an announcement')
for conn in em.getClientList(): for conn in em.getClientList():
conn.notify(Packets.AnnouncePrimary()) conn.notify(Packets.AnnouncePrimary())
conn.abort() conn.abort()
...@@ -224,12 +227,14 @@ class Application(object): ...@@ -224,12 +227,14 @@ class Application(object):
if conn.getAddress() != addr: if conn.getAddress() != addr:
conn.close() conn.close()
# But if there is no such connection, something wrong happened. # But if there is no such connection, something wrong
# happened.
for conn in em.getClientList(): for conn in em.getClientList():
if conn.getAddress() == addr: if conn.getAddress() == addr:
break break
else: else:
raise ElectionFailure, 'no connection remains to the primary' raise ElectionFailure, 'no connection remains to ' \
'the primary'
return return
except ElectionFailure, m: except ElectionFailure, m:
...@@ -321,7 +326,8 @@ class Application(object): ...@@ -321,7 +326,8 @@ class Application(object):
row_list.append((offset, self.pt.getRow(offset))) row_list.append((offset, self.pt.getRow(offset)))
# Split the packet if too huge. # Split the packet if too huge.
if len(row_list) == 1000: if len(row_list) == 1000:
conn.notify(Packets.SendPartitionTable( self.pt.getID(), row_list)) conn.notify(Packets.SendPartitionTable(self.pt.getID(),
row_list))
del row_list[:] del row_list[:]
if row_list: if row_list:
conn.notify(Packets.SendPartitionTable(self.pt.getID(), row_list)) conn.notify(Packets.SendPartitionTable(self.pt.getID(), row_list))
...@@ -366,9 +372,12 @@ class Application(object): ...@@ -366,9 +372,12 @@ class Application(object):
pt.make(node_list) pt.make(node_list)
def recoverStatus(self): def recoverStatus(self):
"""Recover the status about the cluster. Obtain the last OID, the last TID, """
and the last Partition Table ID from storage nodes, then get back the latest Recover the status about the cluster. Obtain the last OID, the last
partition table or make a new table from scratch, if this is the first time.""" TID, and the last Partition Table ID from storage nodes, then get
back the latest partition table or make a new table from scratch,
if this is the first time.
"""
logging.info('begin the recovery of the status') logging.info('begin the recovery of the status')
self.changeClusterState(ClusterStates.RECOVERING) self.changeClusterState(ClusterStates.RECOVERING)
...@@ -545,16 +554,19 @@ class Application(object): ...@@ -545,16 +554,19 @@ class Application(object):
self.broadcastPartitionChanges(self.pt.setNextID(), cell_list) self.broadcastPartitionChanges(self.pt.setNextID(), cell_list)
def provideService(self): def provideService(self):
"""This is the normal mode for a primary master node. Handle transactions """
This is the normal mode for a primary master node. Handle transactions
and stop the service only if a catastrophy happens or the user commits and stop the service only if a catastrophy happens or the user commits
a shutdown.""" a shutdown.
"""
logging.info('provide service') logging.info('provide service')
em = self.em em = self.em
nm = self.nm nm = self.nm
self.changeClusterState(ClusterStates.RUNNING) self.changeClusterState(ClusterStates.RUNNING)
# This dictionary is used to hold information on transactions being finished. # This dictionary is used to hold information on transactions being
# finished.
self.finishing_transaction_dict = {} self.finishing_transaction_dict = {}
# Now everything is passive. # Now everything is passive.
...@@ -562,12 +574,13 @@ class Application(object): ...@@ -562,12 +574,13 @@ class Application(object):
try: try:
em.poll(1) em.poll(1)
except OperationFailure: except OperationFailure:
# If not operational, send Stop Operation packets to storage nodes # If not operational, send Stop Operation packets to storage
# and client nodes. Abort connections to client nodes. # nodes and client nodes. Abort connections to client nodes.
logging.critical('No longer operational, so stopping the service') logging.critical('No longer operational, stopping the service')
for conn in em.getConnectionList(): for conn in em.getConnectionList():
node = nm.getByUUID(conn.getUUID()) node = nm.getByUUID(conn.getUUID())
if node is not None and (node.isStorage() or node.isClient()): if node is not None and (node.isStorage()
or node.isClient()):
conn.notify(Packets.StopOperation()) conn.notify(Packets.StopOperation())
if node.isClient(): if node.isClient():
conn.abort() conn.abort()
...@@ -580,7 +593,8 @@ class Application(object): ...@@ -580,7 +593,8 @@ class Application(object):
dump(self.uuid), *(self.server)) dump(self.uuid), *(self.server))
# all incoming connections identify through this handler # all incoming connections identify through this handler
self.listening_conn.setHandler(identification.IdentificationHandler(self)) self.listening_conn.setHandler(
identification.IdentificationHandler(self))
handler = secondary.SecondaryMasterHandler(self) handler = secondary.SecondaryMasterHandler(self)
em = self.em em = self.em
...@@ -596,8 +610,8 @@ class Application(object): ...@@ -596,8 +610,8 @@ class Application(object):
conn.setHandler(handler) conn.setHandler(handler)
# If I know any storage node, make sure that they are not in the running state, # If I know any storage node, make sure that they are not in the
# because they are not connected at this stage. # running state, because they are not connected at this stage.
for node in nm.getStorageList(): for node in nm.getStorageList():
if node.isRunning(): if node.isRunning():
node.setTemporarilyDown() node.setTemporarilyDown()
...@@ -613,7 +627,9 @@ class Application(object): ...@@ -613,7 +627,9 @@ class Application(object):
self.provideService() self.provideService()
def playSecondaryRole(self): def playSecondaryRole(self):
"""I play a secondary role, thus only wait for a primary master to fail.""" """
I play a secondary role, thus only wait for a primary master to fail.
"""
logging.info('play the secondary role with %s (%s:%d)', logging.info('play the secondary role with %s (%s:%d)',
dump(self.uuid), *(self.server)) dump(self.uuid), *(self.server))
...@@ -631,7 +647,9 @@ class Application(object): ...@@ -631,7 +647,9 @@ class Application(object):
self.em.poll(1) self.em.poll(1)
def changeClusterState(self, state): def changeClusterState(self, state):
""" Change the cluster state and apply right handler on each connections """ """
Change the cluster state and apply right handler on each connections
"""
if self.cluster_state == state: if self.cluster_state == state:
return return
nm, em = self.nm, self.em nm, em = self.nm, self.em
......
...@@ -86,7 +86,8 @@ class BaseServiceHandler(MasterHandler): ...@@ -86,7 +86,8 @@ class BaseServiceHandler(MasterHandler):
node = self.app.nm.getByUUID(conn.getUUID()) node = self.app.nm.getByUUID(conn.getUUID())
assert node is not None assert node is not None
if new_state != NodeStates.BROKEN: if new_state != NodeStates.BROKEN:
new_state = DISCONNECTED_STATE_DICT.get(node.getType(), NodeStates.DOWN) new_state = DISCONNECTED_STATE_DICT.get(node.getType(),
NodeStates.DOWN)
if node.getState() == new_state: if node.getState() == new_state:
return return
if new_state != NodeStates.BROKEN and node.isPending(): if new_state != NodeStates.BROKEN and node.isPending():
......
...@@ -43,7 +43,8 @@ class AdministrationHandler(MasterHandler): ...@@ -43,7 +43,8 @@ class AdministrationHandler(MasterHandler):
self.app.shutdown() self.app.shutdown()
def setNodeState(self, conn, packet, uuid, state, modify_partition_table): def setNodeState(self, conn, packet, uuid, state, modify_partition_table):
logging.info("set node state for %s-%s : %s" % (dump(uuid), state, modify_partition_table)) logging.info("set node state for %s-%s : %s" %
(dump(uuid), state, modify_partition_table))
app = self.app app = self.app
node = app.nm.getByUUID(uuid) node = app.nm.getByUUID(uuid)
if node is None: if node is None:
......
...@@ -111,7 +111,7 @@ class ClientServiceHandler(BaseServiceHandler): ...@@ -111,7 +111,7 @@ class ClientServiceHandler(BaseServiceHandler):
# Collect the UUIDs of nodes related to this transaction. # Collect the UUIDs of nodes related to this transaction.
uuid_set = set() uuid_set = set()
for part in partition_set: for part in partition_set:
uuid_set.update((cell.getUUID() for cell in app.pt.getCellList(part) \ uuid_set.update((cell.getUUID() for cell in app.pt.getCellList(part)
if cell.getNodeState() != NodeStates.HIDDEN)) if cell.getNodeState() != NodeStates.HIDDEN))
# Request locking data. # Request locking data.
......
...@@ -57,7 +57,8 @@ class IdentificationHandler(MasterHandler): ...@@ -57,7 +57,8 @@ class IdentificationHandler(MasterHandler):
node.setAddress(address) node.setAddress(address)
node.setRunning() node.setRunning()
# ask the app the node identification, if refused, an exception is raised # ask the app the node identification, if refused, an exception is
# raised
result = self.app.identifyNode(node_type, uuid, node) result = self.app.identifyNode(node_type, uuid, node)
(uuid, node, state, handler, node_ctor) = result (uuid, node, state, handler, node_ctor) = result
if uuid is None: if uuid is None:
......
...@@ -46,8 +46,8 @@ class RecoveryHandler(MasterHandler): ...@@ -46,8 +46,8 @@ class RecoveryHandler(MasterHandler):
app = self.app app = self.app
if uuid != app.target_uuid: if uuid != app.target_uuid:
# If this is not from a target node, ignore it. # If this is not from a target node, ignore it.
logging.warn('got answer partition table from %s while waiting for %s', logging.warn('got answer partition table from %s while waiting ' \
dump(uuid), dump(app.target_uuid)) 'for %s', dump(uuid), dump(app.target_uuid))
return return
# load unknown storage nodes # load unknown storage nodes
for offset, row in row_list: for offset, row in row_list:
......
...@@ -44,11 +44,13 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -44,11 +44,13 @@ class StorageServiceHandler(BaseServiceHandler):
def askLastIDs(self, conn, packet): def askLastIDs(self, conn, packet):
app = self.app app = self.app
conn.answer(Packets.AnswerLastIDs(app.loid, app.ltid, app.pt.getID()), packet.getId()) conn.answer(Packets.AnswerLastIDs(app.loid, app.ltid, app.pt.getID()),
packet.getId())
def askUnfinishedTransactions(self, conn, packet): def askUnfinishedTransactions(self, conn, packet):
app = self.app app = self.app
p = Packets.AnswerUnfinishedTransactions(app.finishing_transaction_dict.keys()) p = Packets.AnswerUnfinishedTransactions(
app.finishing_transaction_dict.keys())
conn.answer(p, packet.getId()) conn.answer(p, packet.getId())
def notifyInformationLocked(self, conn, packet, tid): def notifyInformationLocked(self, conn, packet, tid):
...@@ -78,7 +80,8 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -78,7 +80,8 @@ class StorageServiceHandler(BaseServiceHandler):
p = Packets.NotifyTransactionFinished(tid) p = Packets.NotifyTransactionFinished(tid)
c.answer(p, t.getMessageId()) c.answer(p, t.getMessageId())
else: else:
p = Packets.InvalidateObjects(t.getOIDList(), tid) p = Packets.InvalidateObjects(t.getOIDList(),
tid)
c.notify(p) c.notify(p)
elif node.isStorage(): elif node.isStorage():
if uuid in t.getUUIDSet(): if uuid in t.getUUIDSet():
...@@ -107,16 +110,18 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -107,16 +110,18 @@ class StorageServiceHandler(BaseServiceHandler):
continue continue
offset = cell[0] offset = cell[0]
logging.debug("node %s is up for offset %s" %(dump(node.getUUID()), offset)) logging.debug("node %s is up for offset %s" %
(dump(node.getUUID()), offset))
# check the storage said it is up to date for a partition it was assigne to # check the storage said it is up to date for a partition it was
# assigne to
for xcell in app.pt.getCellList(offset): for xcell in app.pt.getCellList(offset):
if xcell.getNode().getUUID() == node.getUUID() and \ if xcell.getNode().getUUID() == node.getUUID() and \
xcell.getState() not in (CellStates.OUT_OF_DATE, xcell.getState() not in (CellStates.OUT_OF_DATE,
CellStates.UP_TO_DATE): CellStates.UP_TO_DATE):
msg = "node %s telling that it is UP TO DATE for offset \ msg = "node %s telling that it is UP TO DATE for offset \
%s but where %s for that offset" % (dump(node.getUUID()), offset, %s but where %s for that offset" % (dump(node.getUUID()),
xcell.getState()) offset, xcell.getState())
raise ProtocolError(msg) raise ProtocolError(msg)
......
...@@ -44,11 +44,11 @@ class PartitionTable(neo.pt.PartitionTable): ...@@ -44,11 +44,11 @@ class PartitionTable(neo.pt.PartitionTable):
and n.getUUID() is not None] and n.getUUID() is not None]
if len(node_list) == 0: if len(node_list) == 0:
# Impossible. # Impossible.
raise RuntimeError, \ raise RuntimeError, 'cannot make a partition table with an ' \
'cannot make a partition table with an empty storage node list' 'empty storage node list'
# Take it into account that the number of storage nodes may be less than the # Take it into account that the number of storage nodes may be less
# number of replicas. # than the number of replicas.
repeats = min(self.nr + 1, len(node_list)) repeats = min(self.nr + 1, len(node_list))
index = 0 index = 0
for offset in xrange(self.np): for offset in xrange(self.np):
...@@ -79,22 +79,23 @@ class PartitionTable(neo.pt.PartitionTable): ...@@ -79,22 +79,23 @@ class PartitionTable(neo.pt.PartitionTable):
cell_list = [] cell_list = []
uuid = node.getUUID() uuid = node.getUUID()
for offset, row in enumerate(self.partition_list): for offset, row in enumerate(self.partition_list):
if row is not None: if row is None:
for cell in row: continue
if cell.getNode() is node: for cell in row:
if not cell.isFeeding(): if cell.getNode() is node:
# If this cell is not feeding, find another node if not cell.isFeeding():
# to be added. # If this cell is not feeding, find another node
node_list = [c.getNode() for c in row] # to be added.
n = self.findLeastUsedNode(node_list) node_list = [c.getNode() for c in row]
if n is not None: n = self.findLeastUsedNode(node_list)
row.append(neo.pt.Cell(n, CellStates.OUT_OF_DATE)) if n is not None:
self.count_dict[n] += 1 row.append(neo.pt.Cell(n, CellStates.OUT_OF_DATE))
cell_list.append((offset, n.getUUID(), self.count_dict[n] += 1
CellStates.OUT_OF_DATE)) cell_list.append((offset, n.getUUID(),
row.remove(cell) CellStates.OUT_OF_DATE))
cell_list.append((offset, uuid, CellStates.DISCARDED)) row.remove(cell)
break cell_list.append((offset, uuid, CellStates.DISCARDED))
break
try: try:
del self.count_dict[node] del self.count_dict[node]
...@@ -135,7 +136,8 @@ class PartitionTable(neo.pt.PartitionTable): ...@@ -135,7 +136,8 @@ class PartitionTable(neo.pt.PartitionTable):
if num_cells <= self.nr: if num_cells <= self.nr:
row.append(neo.pt.Cell(node, CellStates.OUT_OF_DATE)) row.append(neo.pt.Cell(node, CellStates.OUT_OF_DATE))
cell_list.append((offset, node.getUUID(), CellStates.OUT_OF_DATE)) cell_list.append((offset, node.getUUID(),
CellStates.OUT_OF_DATE))
node_count += 1 node_count += 1
else: else:
if max_count - node_count > 1: if max_count - node_count > 1:
...@@ -192,7 +194,8 @@ class PartitionTable(neo.pt.PartitionTable): ...@@ -192,7 +194,8 @@ class PartitionTable(neo.pt.PartitionTable):
removed_cell_list.append(feeding_cell) removed_cell_list.append(feeding_cell)
ideal_num = self.nr + 1 ideal_num = self.nr + 1
while len(out_of_date_cell_list) + len(up_to_date_cell_list) > ideal_num: while len(out_of_date_cell_list) + len(up_to_date_cell_list) > \
ideal_num:
# This row contains too many cells. # This row contains too many cells.
if len(up_to_date_cell_list) > 1: if len(up_to_date_cell_list) > 1:
# There are multiple up-to-date cells, so choose whatever # There are multiple up-to-date cells, so choose whatever
...@@ -220,7 +223,8 @@ class PartitionTable(neo.pt.PartitionTable): ...@@ -220,7 +223,8 @@ class PartitionTable(neo.pt.PartitionTable):
row.remove(cell) row.remove(cell)
if not cell.isFeeding(): if not cell.isFeeding():
self.count_dict[cell.getNode()] -= 1 self.count_dict[cell.getNode()] -= 1
changed_cell_list.append((offset, cell.getUUID(), CellStates.DISCARDED)) changed_cell_list.append((offset, cell.getUUID(),
CellStates.DISCARDED))
# Add cells, if a row contains less than the number of replicas. # Add cells, if a row contains less than the number of replicas.
for offset, row in enumerate(self.partition_list): for offset, row in enumerate(self.partition_list):
...@@ -233,7 +237,8 @@ class PartitionTable(neo.pt.PartitionTable): ...@@ -233,7 +237,8 @@ class PartitionTable(neo.pt.PartitionTable):
if node is None: if node is None:
break break
row.append(neo.pt.Cell(node, CellStates.OUT_OF_DATE)) row.append(neo.pt.Cell(node, CellStates.OUT_OF_DATE))
changed_cell_list.append((offset, node.getUUID(), CellStates.OUT_OF_DATE)) changed_cell_list.append((offset, node.getUUID(),
CellStates.OUT_OF_DATE))
self.count_dict[node] += 1 self.count_dict[node] += 1
num_cells += 1 num_cells += 1
...@@ -251,6 +256,7 @@ class PartitionTable(neo.pt.PartitionTable): ...@@ -251,6 +256,7 @@ class PartitionTable(neo.pt.PartitionTable):
for cell in row: for cell in row:
if not cell.getNode().isRunning() and not cell.isOutOfDate(): if not cell.getNode().isRunning() and not cell.isOutOfDate():
cell.setState(CellStates.OUT_OF_DATE) cell.setState(CellStates.OUT_OF_DATE)
cell_list.append((offset, cell.getUUID(), CellStates.OUT_OF_DATE)) cell_list.append((offset, cell.getUUID(),
CellStates.OUT_OF_DATE))
return cell_list return cell_list
...@@ -193,10 +193,12 @@ class Application(object): ...@@ -193,10 +193,12 @@ class Application(object):
def execute(self, args): def execute(self, args):
"""Execute the command given.""" """Execute the command given."""
# print node type : print list of node of the given type (STORAGE_NODE_TYPE, MASTER_NODE_TYPE...) # print node type : print list of node of the given type
# set node uuid state [1|0] : set the node for the given uuid to the state (RUNNING, DOWN...) # (STORAGE_NODE_TYPE, MASTER_NODE_TYPE...)
# and modify the partition if asked # set node uuid state [1|0] : set the node for the given uuid to the
# set cluster name [shutdown|operational] : either shutdown the cluster or mark it as operational # state (RUNNING, DOWN...) and modify the partition if asked
# set cluster name [shutdown|operational] : either shutdown the
# cluster or mark it as operational
current_action = action_dict current_action = action_dict
level = 0 level = 0
while current_action is not None and \ while current_action is not None and \
......
...@@ -147,7 +147,8 @@ def _decodeNodeType(original_node_type): ...@@ -147,7 +147,8 @@ def _decodeNodeType(original_node_type):
def _decodeErrorCode(original_error_code): def _decodeErrorCode(original_error_code):
error_code = ErrorCodes.get(original_error_code) error_code = ErrorCodes.get(original_error_code)
if error_code is None: if error_code is None:
raise PacketMalformedError('invalid error code %d' % original_error_code) raise PacketMalformedError('invalid error code %d' %
original_error_code)
return error_code return error_code
def _decodeAddress(address): def _decodeAddress(address):
...@@ -337,7 +338,8 @@ class AcceptIdentification(Packet): ...@@ -337,7 +338,8 @@ class AcceptIdentification(Packet):
node_type = _decodeNodeType(node_type) node_type = _decodeNodeType(node_type)
uuid = _decodeUUID(uuid) uuid = _decodeUUID(uuid)
your_uuid == _decodeUUID(uuid) your_uuid == _decodeUUID(uuid)
return (node_type, uuid, address, num_partitions, num_replicas, your_uuid) return (node_type, uuid, address, num_partitions, num_replicas,
your_uuid)
class AskPrimary(Packet): class AskPrimary(Packet):
""" """
...@@ -395,8 +397,9 @@ class AnswerLastIDs(Packet): ...@@ -395,8 +397,9 @@ class AnswerLastIDs(Packet):
Reply to Ask Last IDs. S -> PM, PM -> S. Reply to Ask Last IDs. S -> PM, PM -> S.
""" """
def _encode(self, loid, ltid, lptid): def _encode(self, loid, ltid, lptid):
# in this case, loid is a valid OID but considered as invalid. This is not # in this case, loid is a valid OID but considered as invalid. This is
# an issue because the OID 0 is hard coded and will never be generated # not an issue because the OID 0 is hard coded and will never be
# generated
if loid is None: if loid is None:
loid = INVALID_OID loid = INVALID_OID
ltid = _encodeTID(ltid) ltid = _encodeTID(ltid)
...@@ -882,7 +885,8 @@ class AnswerTransactionInformation(Packet): ...@@ -882,7 +885,8 @@ class AnswerTransactionInformation(Packet):
Answer information (user, description) about a transaction. S -> Any. Answer information (user, description) about a transaction. S -> Any.
""" """
def _encode(self, tid, user, desc, ext, oid_list): def _encode(self, tid, user, desc, ext, oid_list):
body = [pack('!8sHHHL', tid, len(user), len(desc), len(ext), len(oid_list))] body = [pack('!8sHHHL', tid, len(user), len(desc), len(ext),
len(oid_list))]
body.append(user) body.append(user)
body.append(desc) body.append(desc)
body.append(ext) body.append(ext)
...@@ -1268,7 +1272,8 @@ class PacketRegistry(dict): ...@@ -1268,7 +1272,8 @@ class PacketRegistry(dict):
StartOperation = register(0x000B, StartOperation) StartOperation = register(0x000B, StartOperation)
StopOperation = register(0x000C, StopOperation) StopOperation = register(0x000C, StopOperation)
AskUnfinishedTransactions = register(0x000D, AskUnfinishedTransactions) AskUnfinishedTransactions = register(0x000D, AskUnfinishedTransactions)
AnswerUnfinishedTransactions = register(0x800d, AnswerUnfinishedTransactions) AnswerUnfinishedTransactions = register(0x800d,
AnswerUnfinishedTransactions)
AskObjectPresent = register(0x000f, AskObjectPresent) AskObjectPresent = register(0x000f, AskObjectPresent)
AnswerObjectPresent = register(0x800f, AnswerObjectPresent) AnswerObjectPresent = register(0x800f, AnswerObjectPresent)
DeleteTransaction = register(0x0010, DeleteTransaction) DeleteTransaction = register(0x0010, DeleteTransaction)
...@@ -1293,7 +1298,8 @@ class PacketRegistry(dict): ...@@ -1293,7 +1298,8 @@ class PacketRegistry(dict):
AskTIDs = register(0x001C, AskTIDs) AskTIDs = register(0x001C, AskTIDs)
AnswerTIDs = register(0x801D, AnswerTIDs) AnswerTIDs = register(0x801D, AnswerTIDs)
AskTransactionInformation = register(0x001E, AskTransactionInformation) AskTransactionInformation = register(0x001E, AskTransactionInformation)
AnswerTransactionInformation = register(0x801E, AnswerTransactionInformation) AnswerTransactionInformation = register(0x801E,
AnswerTransactionInformation)
AskObjectHistory = register(0x001F, AskObjectHistory) AskObjectHistory = register(0x001F, AskObjectHistory)
AnswerObjectHistory = register(0x801F, AnswerObjectHistory) AnswerObjectHistory = register(0x801F, AnswerObjectHistory)
AskOIDs = register(0x0020, AskOIDs) AskOIDs = register(0x0020, AskOIDs)
......
...@@ -249,7 +249,8 @@ class PartitionTable(object): ...@@ -249,7 +249,8 @@ class PartitionTable(object):
line.append('X' * len(node_list)) line.append('X' * len(node_list))
else: else:
cell = [] cell = []
cell_dict = dict([(node_dict.get(x.getUUID(), None), x) for x in row]) cell_dict = dict([(node_dict.get(x.getUUID(), None), x)
for x in row])
for node in xrange(len(node_list)): for node in xrange(len(node_list)):
if node in cell_dict: if node in cell_dict:
cell.append(cell_state_dict[cell_dict[node].getState()]) cell.append(cell_state_dict[cell_dict[node].getState()])
......
...@@ -37,7 +37,8 @@ class BaseMasterHandler(BaseStorageHandler): ...@@ -37,7 +37,8 @@ class BaseMasterHandler(BaseStorageHandler):
raise PrimaryFailure('re-election occurs') raise PrimaryFailure('re-election occurs')
def notifyClusterInformation(self, conn, packet, state): def notifyClusterInformation(self, conn, packet, state):
logging.error('ignoring notify cluster information in %s' % self.__class__.__name__) logging.error('ignoring notify cluster information in %s' %
self.__class__.__name__)
def notifyLastOID(self, conn, packet, oid): def notifyLastOID(self, conn, packet, oid):
self.app.loid = oid self.app.loid = oid
...@@ -104,7 +105,8 @@ class BaseClientAndStorageOperationHandler(BaseStorageHandler): ...@@ -104,7 +105,8 @@ class BaseClientAndStorageOperationHandler(BaseStorageHandler):
if t is None: if t is None:
p = protocol.tidNotFound('%s does not exist' % dump(tid)) p = protocol.tidNotFound('%s does not exist' % dump(tid))
else: else:
p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3], t[0]) p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3],
t[0])
conn.answer(p, packet.getId()) conn.answer(p, packet.getId())
def askObject(self, conn, packet, oid, serial, tid): def askObject(self, conn, packet, oid, serial, tid):
......
...@@ -83,7 +83,8 @@ class VerificationHandler(BaseMasterHandler): ...@@ -83,7 +83,8 @@ class VerificationHandler(BaseMasterHandler):
if t is None: if t is None:
p = protocol.tidNotFound('%s does not exist' % dump(tid)) p = protocol.tidNotFound('%s does not exist' % dump(tid))
else: else:
p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3], t[0]) p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3],
t[0])
conn.answer(p, packet.getId()) conn.answer(p, packet.getId())
def askObjectPresent(self, conn, packet, oid, tid): def askObjectPresent(self, conn, packet, oid, tid):
......
...@@ -444,7 +444,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -444,7 +444,7 @@ class MySQLDatabaseManager(DatabaseManager):
q("""INSERT INTO obj SELECT * FROM tobj WHERE tobj.serial = %d""" \ q("""INSERT INTO obj SELECT * FROM tobj WHERE tobj.serial = %d""" \
% tid) % tid)
q("""DELETE FROM tobj WHERE serial = %d""" % tid) q("""DELETE FROM tobj WHERE serial = %d""" % tid)
q("""INSERT INTO trans SELECT * FROM ttrans WHERE ttrans.tid = %d""" \ q("""INSERT INTO trans SELECT * FROM ttrans WHERE ttrans.tid = %d"""
% tid) % tid)
q("""DELETE FROM ttrans WHERE tid = %d""" % tid) q("""DELETE FROM ttrans WHERE tid = %d""" % tid)
except: except:
......
...@@ -126,7 +126,8 @@ class Replicator(object): ...@@ -126,7 +126,8 @@ class Replicator(object):
partition.setCriticalTID(tid) partition.setCriticalTID(tid)
del self.critical_tid_dict[msg_id] del self.critical_tid_dict[msg_id]
except KeyError: except KeyError:
logging.debug("setCriticalTID raised KeyError for msg_id %s" %(msg_id,)) logging.debug("setCriticalTID raised KeyError for msg_id %s" %
(msg_id, ))
def _askCriticalTID(self): def _askCriticalTID(self):
conn = self.primary_master_connection conn = self.primary_master_connection
...@@ -164,7 +165,8 @@ class Replicator(object): ...@@ -164,7 +165,8 @@ class Replicator(object):
addr = node.getAddress() addr = node.getAddress()
if addr is None: if addr is None:
logging.error("no address known for the selected node %s" %(dump(node.getUUID()))) logging.error("no address known for the selected node %s" %
(dump(node.getUUID()), ))
return return
if self.current_connection is not None: if self.current_connection is not None:
if self.current_connection.getAddress() == addr: if self.current_connection.getAddress() == addr:
...@@ -177,8 +179,7 @@ class Replicator(object): ...@@ -177,8 +179,7 @@ class Replicator(object):
if self.current_connection is None: if self.current_connection is None:
handler = replication.ReplicationHandler(app) handler = replication.ReplicationHandler(app)
self.current_connection = ClientConnection(app.em, handler, self.current_connection = ClientConnection(app.em, handler,
addr = addr, addr = addr, connector_handler = app.connector_handler)
connector_handler = app.connector_handler)
p = Packets.RequestIdentification(NodeTypes.STORAGE, p = Packets.RequestIdentification(NodeTypes.STORAGE,
app.uuid, app.server, app.name) app.uuid, app.server, app.name)
self.current_connection.ask(p) self.current_connection.ask(p)
...@@ -196,7 +197,8 @@ class Replicator(object): ...@@ -196,7 +197,8 @@ class Replicator(object):
# Notify to a primary master node that my cell is now up-to-date. # Notify to a primary master node that my cell is now up-to-date.
conn = self.primary_master_connection conn = self.primary_master_connection
p = Packets.NotifyPartitionChanges(app.pt.getID(), p = Packets.NotifyPartitionChanges(app.pt.getID(),
[(self.current_partition.getRID(), app.uuid, CellStates.UP_TO_DATE)]) [(self.current_partition.getRID(), app.uuid,
CellStates.UP_TO_DATE)])
conn.notify(p) conn.notify(p)
except KeyError: except KeyError:
pass pass
...@@ -234,7 +236,8 @@ class Replicator(object): ...@@ -234,7 +236,8 @@ class Replicator(object):
self._askUnfinishedTIDs() self._askUnfinishedTIDs()
else: else:
if self.replication_done: if self.replication_done:
logging.info('replication is done for %s' %(self.current_partition.getRID(),)) logging.info('replication is done for %s' %
(self.current_partition.getRID(), ))
self._finishReplication() self._finishReplication()
def removePartition(self, rid): def removePartition(self, rid):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment