Commit e7639d7a authored by Yoshinori Okuji's avatar Yoshinori Okuji

Fix misc bugs.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@72 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 6fcbdc65
......@@ -2,6 +2,8 @@ import logging
from select import select
from time import time
from neo.protocol import Packet
class IdleEvent(object):
"""This class represents an event called when a connection is waiting for
a message too long."""
......@@ -72,8 +74,12 @@ class EventManager(object):
conn.readable()
for s in wlist:
conn = self.connection_dict[s]
conn.writable()
# This can fail, if a connection is closed in readable().
try:
conn = self.connection_dict[s]
conn.writable()
except KeyError:
pass
# Check idle events. Do not check them out too often, because this
# is somehow heavy.
......@@ -83,9 +89,13 @@ class EventManager(object):
if t - self.prev_time >= 1:
self.prev_time = t
event_list.sort(key = lambda event: event.getTime())
for event in tuple(event_list):
while event_list:
event = event_list[0]
if event(t):
event_list.pop(0)
try:
event_list.remove(event)
except ValueError:
pass
else:
break
......
......@@ -7,6 +7,7 @@ from neo.connection import ClientConnection
from neo.exception import ElectionFailure
from neo.protocol import Packet, INVALID_UUID
from neo.util import dump
from neo.node import MasterNode, StorageNode, ClientNode
class ElectionEventHandler(MasterEventHandler):
"""This class deals with events for a primary master election."""
......@@ -174,7 +175,7 @@ class ElectionEventHandler(MasterEventHandler):
if node is None:
node = MasterNode(server = addr, uuid = uuid)
app.nm.add(node)
ap.unconnected_master_node_set.add(addr)
app.unconnected_master_node_set.add(addr)
else:
# If this node is broken, reject it.
if node.getUUID() == uuid:
......
......@@ -6,7 +6,7 @@ from neo.protocol import MASTER_NODE_TYPE, \
from neo.master.handler import MasterEventHandler
from neo.exception import ElectionFailure
from neo.protocol import Packet, INVALID_UUID
from neo.node import ClientNode, StorageNode
from neo.node import ClientNode, StorageNode, MasterNode
from neo.util import dump
class RecoveryEventHandler(MasterEventHandler):
......@@ -88,7 +88,7 @@ class RecoveryEventHandler(MasterEventHandler):
if node_type == MASTER_NODE_TYPE:
node = MasterNode(server = addr, uuid = uuid)
else:
node = StorageNode(server = address, uuid = uuid)
node = StorageNode(server = addr, uuid = uuid)
app.nm.add(node)
app.broadcastNodeInformation(node)
else:
......
......@@ -706,7 +706,7 @@ class Packet(object):
def _decodeNotifyNodeInformation(self):
try:
n = unpack('!L', self._body[:4])
n = unpack('!L', self._body[:4])[0]
node_list = []
for i in xrange(n):
r = unpack('!H4sH16sH', self._body[4+i*26:30+i*26])
......@@ -721,7 +721,7 @@ class Packet(object):
raise
except:
raise ProtocolError(self, 'invalid answer node information')
return node_list
return (node_list,)
decode_table[NOTIFY_NODE_INFORMATION] = _decodeNotifyNodeInformation
def _decodeAskLastIDs(self):
......@@ -745,7 +745,7 @@ class Packet(object):
offset_list.append(offset)
except:
raise ProtocolError(self, 'invalid ask partition table')
return offset_list
return (offset_list,)
decode_table[ASK_PARTITION_TABLE] = _decodeAskPartitionTable
def _decodeAnswerPartitionTable(self):
......@@ -820,7 +820,7 @@ class Packet(object):
tid_list.append(tid)
except:
raise ProtocolError(self, 'invalid answer unfinished transactions')
return tid_list
return (tid_list,)
decode_table[ANSWER_UNFINISHED_TRANSACTIONS] = _decodeAnswerUnfinishedTransactions
def _decodeAskObjectPresent(self):
......@@ -844,7 +844,7 @@ class Packet(object):
tid = unpack('8s', self._body)
except:
raise ProtocolError(self, 'invalid delete transaction')
return tid
return (tid,)
decode_table[DELETE_TRANSACTION] = _decodeDeleteTransaction
def _decodeCommitTransaction(self):
......@@ -852,7 +852,7 @@ class Packet(object):
tid = unpack('8s', self._body)
except:
raise ProtocolError(self, 'invalid commit transaction')
return tid
return (tid,)
decode_table[COMMIT_TRANSACTION] = _decodeCommitTransaction
def _decodeAskNewTID(self):
......@@ -864,7 +864,7 @@ class Packet(object):
tid = unpack('8s', self._body)
except:
raise ProtocolError(self, 'invalid answer new tid')
return tid
return (tid,)
decode_table[ANSWER_NEW_TID] = _decodeAnswerNewTID
def _decodeAskNewOIDs(self):
......@@ -872,7 +872,7 @@ class Packet(object):
num_oids = unpack('!H', self._body)
except:
raise ProtocolError(self, 'invalid ask new oids')
return num_oids
return (num_oids,)
decode_table[ASK_NEW_OIDS] = _decodeAskNewOIDs
def _decodeAnswerNewOIDs(self):
......@@ -884,7 +884,7 @@ class Packet(object):
oid_list.append(oid)
except:
raise ProtocolError(self, 'invalid answer new oids')
return oid_list
return (oid_list,)
decode_table[ANSWER_NEW_OIDS] = _decodeAnswerNewOIDs
def _decodeFinishTransaction(self):
......@@ -904,7 +904,7 @@ class Packet(object):
tid = unpack('8s', self._body)
except:
raise ProtocolError(self, 'invalid notify transactin finished')
return tid
return (tid,)
decode_table[NOTIFY_TRANSACTION_FINISHED] = _decodeNotifyTransactionFinished
def _decodeLockInformation(self):
......@@ -912,7 +912,7 @@ class Packet(object):
tid = unpack('8s', self._body)
except:
raise ProtocolError(self, 'invalid lock information')
return tid
return (tid,)
decode_table[LOCK_INFORMATION] = _decodeLockInformation
def _decodeNotifyInformationLocked(self):
......@@ -920,7 +920,7 @@ class Packet(object):
tid = unpack('8s', self._body)
except:
raise ProtocolError(self, 'invalid notify information locked')
return tid
return (tid,)
decode_table[NOTIFY_INFORMATION_LOCKED] = _decodeNotifyInformationLocked
def _decodeInvalidateObjects(self):
......@@ -932,7 +932,7 @@ class Packet(object):
oid_list.append(oid)
except:
raise ProtocolError(self, 'invalid finish transaction')
return oid_list
return (oid_list,)
decode_table[INVALIDATE_OBJECTS] = _decodeInvalidateObjects
def _decodeUnlockInformation(self):
......@@ -948,7 +948,7 @@ class Packet(object):
tid = unpack('8s', self._body)
except:
raise ProtocolError(self, 'invalid abort transaction')
return tid
return (tid,)
decode_table[ABORT_TRANSACTION] = _decodeAbortTransaction
def _decodeAskStoreObject(self):
......@@ -997,7 +997,7 @@ class Packet(object):
tid = unpack('8s', self._body)
except:
raise ProtocolError(self, 'invalid answer store transaction')
return tid
return (tid,)
decode_table[ANSWER_STORE_TRANSACTION] = _decodeAnswerStoreTransaction
def _decodeAskObject(self):
......@@ -1037,7 +1037,7 @@ class Packet(object):
tid_list.append(tid)
except:
raise ProtocolError(self, 'invalid answer tids')
return tid_list
return (tid_list,)
decode_table[ANSWER_TIDS] = _decodeAnswerTIDs
def _decodeAskTransactionInformation(self):
......@@ -1045,7 +1045,7 @@ class Packet(object):
tid = unpack('8s', self._body)
except:
raise ProtocolError(self, 'invalid ask transaction information')
return tid
return (tid,)
decode_table[ASK_TRANSACTION_INFORMATION] = _decodeAskTransactionInformation
def _decodeAnswerTransactionInformation(self):
......
......@@ -16,7 +16,7 @@ from neo.util import dump
from neo.connection import ListeningConnection, ClientConnection, ServerConnection
from neo.exception import OperationFailure, PrimaryFailure
from neo.pt import PartitionTable
from neo.storage.bootstrap import BoostrapEventHandler
from neo.storage.bootstrap import BootstrapEventHandler
from neo.storage.verification import VerificationEventHandler
from neo.storage.operation import OperationEventHandler
......@@ -26,10 +26,10 @@ class Application(object):
def __init__(self, file, section, reset = False):
config = ConfigurationManager(file, section)
self.num_partitions = config.getPartitions()
self.num_partitions = None
self.num_replicas = None
self.name = config.getName()
logging.debug('the number of replicas is %d, the number of partitions is %d, the name is %s',
self.num_replicas, self.num_partitions, self.name)
logging.debug('the name is %s', self.name)
self.server = config.getServer()
logging.debug('IP address is %s, port is %d', *(self.server))
......@@ -43,7 +43,9 @@ class Application(object):
self.dm = MySQLDatabaseManager(database = config.getDatabase(),
user = config.getUser(),
password = config.getPassword())
self.pt = PartitionTable(self.num_partitions, 0)
# The partition table is initialized after getting the number of
# partitions.
self.pt = None
self.primary_master_node = None
......@@ -66,11 +68,7 @@ class Application(object):
self.uuid = uuid
dm.setUUID(uuid)
num_partitions = dm.getNumPartitions()
if num_partitions is None:
dm.setNumPartitions(self.num_partitions)
elif num_partitions != self.num_partitions:
raise RuntimeError('partitions do not match with the database')
self.num_partitions = dm.getNumPartitions()
name = dm.getName()
if name is None:
......@@ -105,7 +103,7 @@ class Application(object):
def run(self):
"""Make sure that the status is sane and start a loop."""
if self.num_partitions <= 0:
if self.num_partitions is not None and self.num_partitions <= 0:
raise RuntimeError, 'partitions must be more than zero'
if len(self.name) == 0:
raise RuntimeError, 'cluster name must be non-empty'
......@@ -144,8 +142,9 @@ class Application(object):
# Reload a partition table from the database. This is necessary
# when a previous primary master died while sending a partition
# table, because the table might be incomplete.
self.loadPartitionTable()
self.ptid = self.dm.getPTID()
if self.pt is not None:
self.loadPartitionTable()
self.ptid = self.dm.getPTID()
handler = BootstrapEventHandler(self)
em = self.em
......
......@@ -3,10 +3,11 @@ import logging
from neo.storage.handler import StorageEventHandler
from neo.protocol import INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE
from neo.utils import dump
from neo.util import dump
from neo.node import MasterNode, StorageNode, ClientNode
from neo.connetion import ClientConnection
from neo.connection import ClientConnection
from neo.protocol import Packet
from neo.pt import PartitionTable
class BootstrapEventHandler(StorageEventHandler):
"""This class deals with events for a bootstrap phase."""
......@@ -19,7 +20,7 @@ class BootstrapEventHandler(StorageEventHandler):
p = Packet()
msg_id = conn.getNextId()
p.requestNodeIdentification(msg_id, MASTER_NODE_TYPE, app.uuid,
p.requestNodeIdentification(msg_id, STORAGE_NODE_TYPE, app.uuid,
app.server[0], app.server[1], app.name)
conn.addPacket(p)
conn.expectMessage(msg_id)
......@@ -36,6 +37,8 @@ class BootstrapEventHandler(StorageEventHandler):
# So this would effectively mean that it is dead.
app.primary_master_node = None
app.trying_master_node = None
StorageEventHandler.connectionFailed(self, conn)
def connectionAccepted(self, conn, s, addr):
......@@ -46,6 +49,7 @@ class BootstrapEventHandler(StorageEventHandler):
def timeoutExpired(self, conn):
if isinstance(conn, ClientConnection):
app = self.app
if app.trying_master_node is app.primary_master_node:
# If a primary master node timeouts, I should not rely on it.
app.primary_master_node = None
......@@ -56,6 +60,7 @@ class BootstrapEventHandler(StorageEventHandler):
def connectionClosed(self, conn):
if isinstance(conn, ClientConnection):
app = self.app
if app.trying_master_node is app.primary_master_node:
# If a primary master node closes, I should not rely on it.
app.primary_master_node = None
......@@ -66,6 +71,7 @@ class BootstrapEventHandler(StorageEventHandler):
def peerBroken(self, conn):
if isinstance(conn, ClientConnection):
app = self.app
if app.trying_master_node is app.primary_master_node:
# If a primary master node gets broken, I should not rely
# on it.
......@@ -77,6 +83,7 @@ class BootstrapEventHandler(StorageEventHandler):
def handleNotReady(self, conn, packet, message):
if isinstance(conn, ClientConnection):
app = self.app
if app.trying_master_node is not None:
app.trying_master_node = None
......@@ -87,6 +94,7 @@ class BootstrapEventHandler(StorageEventHandler):
if isinstance(conn, ClientConnection):
self.handleUnexpectedPacket(conn, packet)
else:
app = self.app
if node_type != MASTER_NODE_TYPE:
logging.info('reject a connection from a non-master')
conn.addPacket(Packet().notReady(packet.getId(), 'retry later'))
......@@ -120,14 +128,16 @@ class BootstrapEventHandler(StorageEventHandler):
p = Packet()
p.acceptNodeIdentification(packet.getId(), STORAGE_NODE_TYPE,
app.uuid, app.server[0], app.server[1])
app.uuid, app.server[0], app.server[1],
0, 0)
conn.addPacket(p)
# Now the master node should know that I am not the right one.
conn.abort()
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port):
uuid, ip_address, port,
num_partitions, num_replicas):
if not isinstance(conn, ClientConnection):
self.handleUnexpectedPacket(conn, packet)
else:
......@@ -143,11 +153,23 @@ class BootstrapEventHandler(StorageEventHandler):
# The server address is different! Then why was
# the connection successful?
logging.error('%s:%d is waiting for %s:%d',
conn.getAddress()[0], conn.getAddress()[1], ip_address, port)
conn.getAddress()[0], conn.getAddress()[1],
ip_address, port)
app.nm.remove(node)
conn.close()
return
if app.num_partitions is None:
app.num_partitions = num_partitions
app.num_replicas = num_replicas
app.pt = PartitionTable(num_partitions, num_replicas)
app.loadPartitionTable()
app.ptid = app.dm.getPTID()
elif app.num_partitions != num_partitions:
raise RuntimeError('the number of partitions is inconsistent')
elif app.num_replicas != num_replicas:
raise RuntimeError('the number of replicas is inconsistent')
conn.setUUID(uuid)
node.setUUID(uuid)
......@@ -169,7 +191,6 @@ class BootstrapEventHandler(StorageEventHandler):
if n is None:
n = MasterNode(server = addr)
app.nm.add(n)
app.unconnected_master_node_set.add(addr)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
......
......@@ -3,9 +3,9 @@ import logging
from neo.handler import EventHandler
from neo.protocol import INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE
from neo.utils import dump
from neo.util import dump
from neo.node import MasterNode, StorageNode, ClientNode
from neo.connetion import ClientConnection
from neo.connection import ClientConnection
class StorageEventHandler(EventHandler):
"""This class implements a generic part of the event handlers."""
......@@ -18,7 +18,8 @@ class StorageEventHandler(EventHandler):
raise NotImplementedError('this method must be overridden')
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port):
uuid, ip_address, port,
num_partitions, num_replicas):
raise NotImplementedError('this method must be overridden')
def handleAskPrimaryMaster(self, conn, packet):
......
......@@ -94,7 +94,7 @@ class MySQLDatabaseManager(DatabaseManager):
tid BINARY(8) NOT NULL PRIMARY KEY,
oids MEDIUMBLOB NOT NULL,
user BLOB NOT NULL,
desc BLOB NOT NULL,
description BLOB NOT NULL,
ext BLOB NOT NULL
) ENGINE = InnoDB""")
......@@ -113,7 +113,7 @@ class MySQLDatabaseManager(DatabaseManager):
tid BINARY(8) NOT NULL,
oids MEDIUMBLOB NOT NULL,
user BLOB NOT NULL,
desc BLOB NOT NULL,
description BLOB NOT NULL,
ext BLOB NOT NULL
) ENGINE = InnoDB""")
......@@ -402,10 +402,11 @@ class MySQLDatabaseManager(DatabaseManager):
e = self.escape
tid = e(tid)
self.begin()
r = q("""SELECT oids, user, desc, ext FROM trans WHERE tid = '%s'""" \
r = q("""SELECT oids, user, description, ext FROM trans
WHERE tid = '%s'""" \
% tid)
if not r and all:
r = q("""SELECT oids, user, desc, ext FROM ttrans
r = q("""SELECT oids, user, description, ext FROM ttrans
WHERE tid = '%s'""" \
% tid)
self.commit()
......
......@@ -3,9 +3,9 @@ import logging
from neo.storage.handler import StorageEventHandler
from neo.protocol import INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE
from neo.utils import dump
from neo.util import dump
from neo.node import MasterNode, StorageNode, ClientNode
from neo.connetion import ClientConnection
from neo.connection import ClientConnection
from neo.protocol import Packet
class TransactionInformation(object):
......@@ -161,7 +161,8 @@ class OperationEventHandler(StorageEventHandler):
conn.abort()
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port):
uuid, ip_address, port,
num_partitions, num_replicas):
if isinstance(conn, ClientConnection):
raise NotImplementedError
else:
......
......@@ -3,10 +3,11 @@ import logging
from neo.storage.handler import StorageEventHandler
from neo.protocol import INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE
from neo.utils import dump
from neo.util import dump
from neo.node import MasterNode, StorageNode, ClientNode
from neo.connetion import ClientConnection
from neo.connection import ClientConnection
from neo.protocol import Packet
from neo.exception import PrimaryFailure
class VerificationEventHandler(StorageEventHandler):
"""This class deals with events for a verification phase."""
......@@ -86,7 +87,8 @@ class VerificationEventHandler(StorageEventHandler):
conn.abort()
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port):
uuid, ip_address, port,
num_partitions, num_replicas):
self.handleUnexpectedPacket(conn, packet)
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment