Commit 99d73db1 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Add dropUnfinishedData into the database API so tht out-of-dated cells can...

Add dropUnfinishedData into the database API so tht out-of-dated cells can drop invalid data. Fix some bugs related to additional storage nodes.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@164 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent c9f67e6c
...@@ -57,7 +57,8 @@ class EventHandler(object): ...@@ -57,7 +57,8 @@ class EventHandler(object):
def packetReceived(self, conn, packet): def packetReceived(self, conn, packet):
"""Called when a packet is received.""" """Called when a packet is received."""
logging.debug('packet received from %s:%d', *(conn.getAddress())) logging.debug('packet %d:%x received from %s:%d',
packet.getId(), packet.getType(), *(conn.getAddress()))
self.dispatch(conn, packet) self.dispatch(conn, packet)
def packetMalformed(self, conn, packet, error_message): def packetMalformed(self, conn, packet, error_message):
......
...@@ -291,6 +291,7 @@ class Application(object): ...@@ -291,6 +291,7 @@ class Application(object):
def broadcastPartitionChanges(self, ptid, cell_list): def broadcastPartitionChanges(self, ptid, cell_list):
"""Broadcast a Notify Partition Changes packet.""" """Broadcast a Notify Partition Changes packet."""
self.pt.log()
for c in self.em.getConnectionList(): for c in self.em.getConnectionList():
if c.getUUID() is not None: if c.getUUID() is not None:
n = self.nm.getNodeByUUID(c.getUUID()) n = self.nm.getNodeByUUID(c.getUUID())
......
import logging import logging
from copy import copy
from neo.protocol import MASTER_NODE_TYPE, CLIENT_NODE_TYPE, \ from neo.protocol import MASTER_NODE_TYPE, CLIENT_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE
...@@ -6,6 +7,7 @@ from neo.master.handler import MasterEventHandler ...@@ -6,6 +7,7 @@ from neo.master.handler import MasterEventHandler
from neo.protocol import Packet, INVALID_UUID from neo.protocol import Packet, INVALID_UUID
from neo.exception import OperationFailure, ElectionFailure from neo.exception import OperationFailure, ElectionFailure
from neo.node import ClientNode, StorageNode, MasterNode from neo.node import ClientNode, StorageNode, MasterNode
from neo.util import dump
class FinishingTransaction(object): class FinishingTransaction(object):
"""This class describes a finishing transaction.""" """This class describes a finishing transaction."""
...@@ -109,22 +111,24 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -109,22 +111,24 @@ class ServiceEventHandler(MasterEventHandler):
conn.abort() conn.abort()
return return
# Here are many situations. In principle, a node should be identified by # Here are many situations. In principle, a node should be identified
# an UUID, since an UUID never change when moving a storage node to a different # by an UUID, since an UUID never change when moving a storage node
# server, and an UUID always changes for a master node and a client node whenever # to a different server, and an UUID always changes for a master node
# it restarts, so more reliable than a server address. # and a client node whenever it restarts, so more reliable than a
# server address.
# #
# However, master nodes can be known only as the server addresses. And, a node # However, master nodes can be known only as the server addresses.
# may claim a server address used by another node. # And, a node may claim a server address used by another node.
addr = (ip_address, port) addr = (ip_address, port)
# First, get the node by the UUID. # First, get the node by the UUID.
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
old_node = None
if node is None: if node is None:
# If nothing is present, try with the server address. # If nothing is present, try with the server address.
node = app.nm.getNodeByServer(addr) node = app.nm.getNodeByServer(addr)
if node is None: if node is None:
# Nothing is found. So this must be the first time that this node # Nothing is found. So this must be the first time that
# connected to me. # this node connected to me.
if node_type == MASTER_NODE_TYPE: if node_type == MASTER_NODE_TYPE:
node = MasterNode(server = addr, uuid = uuid) node = MasterNode(server = addr, uuid = uuid)
elif node_type == CLIENT_NODE_TYPE: elif node_type == CLIENT_NODE_TYPE:
...@@ -135,15 +139,18 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -135,15 +139,18 @@ class ServiceEventHandler(MasterEventHandler):
logging.debug('broadcasting node information') logging.debug('broadcasting node information')
app.broadcastNodeInformation(node) app.broadcastNodeInformation(node)
else: else:
# Otherwise, I know it only by the server address or the same server # Otherwise, I know it only by the server address or the same
# address but with a different UUID. # server address but with a different UUID.
if node.getUUID() is None: if node.getUUID() is None:
# This must be a master node. # This must be a master node.
if not isinstance(node, MasterNode) or node_type != MASTER_NODE_TYPE: if not isinstance(node, MasterNode) \
# Error. This node uses the same server address as a master or node_type != MASTER_NODE_TYPE:
# node. # Error. This node uses the same server address as
conn.addPacket(Packet().protocolError(packet.getId(), # a master node.
'invalid server address')) p = Packet()
p.protocolError(packet.getId(),
'invalid server address')
conn.addPacket(p)
conn.abort() conn.abort()
return return
...@@ -156,47 +163,59 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -156,47 +163,59 @@ class ServiceEventHandler(MasterEventHandler):
# This node has a different UUID. # This node has a different UUID.
if node.getState() == RUNNING_STATE: if node.getState() == RUNNING_STATE:
# If it is still running, reject this node. # If it is still running, reject this node.
conn.addPacket(Packet().protocolError(packet.getId(), p = Packet()
'invalid server address')) p.protocolError(packet.getId(),
'invalid server address')
conn.addPacket(p)
conn.abort() conn.abort()
return return
else: else:
# Otherwise, forget the old one. # Otherwise, forget the old one.
node.setState(BROKEN_STATE) node.setState(DOWN_STATE)
logging.debug('broadcasting node information') logging.debug('broadcasting node information')
app.broadcastNodeInformation(node) app.broadcastNodeInformation(node)
app.nm.remove(node)
old_node = node
node = copy(node)
# And insert a new one. # And insert a new one.
node.setUUID(uuid) node.setUUID(uuid)
node.setState(RUNNING_STATE) node.setState(RUNNING_STATE)
logging.debug('broadcasting node information') logging.debug('broadcasting node information')
app.broadcastNodeInformation(node) app.broadcastNodeInformation(node)
app.nm.add(node)
else: else:
# I know this node by the UUID. # I know this node by the UUID.
try: try:
ip_address, port = node.getServer() ip_address, port = node.getServer()
except TypeError: except TypeError:
ip_address, port = '0.0.0.0', 0 ip_address, port = '0.0.0.0', 0
if (ip_address, port) != addr: if (ip_address, port) != addr:
# This node has a different server address. # This node has a different server address.
if node.getState() == RUNNING_STATE: if node.getState() == RUNNING_STATE:
# If it is still running, reject this node. # If it is still running, reject this node.
conn.addPacket(Packet().protocolError(packet.getId(), p = Packet()
'invalid server address')) p.protocolError(packet.getId(),
'invalid server address')
conn.addPacket(p)
conn.abort() conn.abort()
return return
else: else:
# Otherwise, forget the old one. # Otherwise, forget the old one.
node.setState(BROKEN_STATE) node.setState(DOWN_STATE)
logging.debug('broadcasting node information') logging.debug('broadcasting node information')
app.broadcastNodeInformation(node) app.broadcastNodeInformation(node)
app.nm.remove(node)
old_node = node
node = copy(node)
# And insert a new one. # And insert a new one.
node.setServer(addr) node.setServer(addr)
node.setState(RUNNING_STATE) node.setState(RUNNING_STATE)
logging.debug('broadcasting node information') logging.debug('broadcasting node information')
app.broadcastNodeInformation(node) app.broadcastNodeInformation(node)
app.nm.add(node)
else: else:
# If this node is broken, reject it. Otherwise, assume that it is # If this node is broken, reject it. Otherwise, assume that
# working again. # it is working again.
if node.getState() == BROKEN_STATE: if node.getState() == BROKEN_STATE:
p = Packet() p = Packet()
p.brokenNodeDisallowedError(packet.getId(), 'go away') p.brokenNodeDisallowedError(packet.getId(), 'go away')
...@@ -214,7 +233,15 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -214,7 +233,15 @@ class ServiceEventHandler(MasterEventHandler):
if isinstance(node, StorageNode): if isinstance(node, StorageNode):
# If this is a storage node, add it into the partition table. # If this is a storage node, add it into the partition table.
# Note that this does no harm, even if the node is not new. # Note that this does no harm, even if the node is not new.
cell_list = app.pt.addNode(node) if old_node is not None:
logging.info('dropping %s from a partition table',
dump(old_node.getUUID()))
cell_list = app.pt.dropNode(old_node)
else:
cell_list = []
logging.info('adding %s into a partition table',
dump(node.getUUID()))
cell_list.extend(app.pt.addNode(node))
if len(cell_list) != 0: if len(cell_list) != 0:
ptid = app.getNextPartitionTableID() ptid = app.getNextPartitionTableID()
app.broadcastPartitionChanges(ptid, cell_list) app.broadcastPartitionChanges(ptid, cell_list)
......
...@@ -193,13 +193,16 @@ class PartitionTable(object): ...@@ -193,13 +193,16 @@ class PartitionTable(object):
for offset, row in enumerate(self.partition_list): for offset, row in enumerate(self.partition_list):
if row is not None: if row is not None:
for cell in row: for cell in row:
if cell.getNode() == node: if cell.getNode() is node:
if cell.getState() != FEEDING_STATE: if cell.getState() != FEEDING_STATE:
# If this cell is not feeding, find another node to be added. # If this cell is not feeding, find another node
node = self.findLeastUsedNode([cell.getNode() for cell in row]) # to be added.
if node is not None: node_list = [c.getNode() for c in row]
row.append(Cell(node, OUT_OF_DATE_STATE)) n = self.findLeastUsedNode(node_list)
cell_list.append((offset, node.getUUID(), if n is not None:
row.append(Cell(n, OUT_OF_DATE_STATE))
self.count_dict[n] += 1
cell_list.append((offset, n.getUUID(),
OUT_OF_DATE_STATE)) OUT_OF_DATE_STATE))
row.remove(cell) row.remove(cell)
cell_list.append((offset, uuid, DISCARDED_STATE)) cell_list.append((offset, uuid, DISCARDED_STATE))
......
...@@ -114,6 +114,7 @@ class Application(object): ...@@ -114,6 +114,7 @@ class Application(object):
# start the operation. This cycle will be executed permentnly, # start the operation. This cycle will be executed permentnly,
# until the user explicitly requests a shutdown. # until the user explicitly requests a shutdown.
while 1: while 1:
self.operational = False
self.connectToPrimaryMaster() self.connectToPrimaryMaster()
try: try:
while 1: while 1:
...@@ -122,6 +123,7 @@ class Application(object): ...@@ -122,6 +123,7 @@ class Application(object):
self.doOperation() self.doOperation()
except OperationFailure: except OperationFailure:
logging.error('operation stopped') logging.error('operation stopped')
self.operational = False
except PrimaryFailure: except PrimaryFailure:
logging.error('primary master is down') logging.error('primary master is down')
...@@ -203,7 +205,6 @@ class Application(object): ...@@ -203,7 +205,6 @@ class Application(object):
for conn in em.getConnectionList(): for conn in em.getConnectionList():
conn.setHandler(handler) conn.setHandler(handler)
self.operational = False
while not self.operational: while not self.operational:
em.poll(1) em.poll(1)
...@@ -239,6 +240,9 @@ class Application(object): ...@@ -239,6 +240,9 @@ class Application(object):
# This is a queue of events used to delay operations due to locks. # This is a queue of events used to delay operations due to locks.
self.event_queue = deque() self.event_queue = deque()
# Forget all unfinished data.
self.dm.dropUnfinishedData()
while 1: while 1:
em.poll(1) em.poll(1)
......
...@@ -97,6 +97,10 @@ class DatabaseManager(object): ...@@ -97,6 +97,10 @@ class DatabaseManager(object):
thrown away.""" thrown away."""
raise NotImplementedError('this method must be overridden') raise NotImplementedError('this method must be overridden')
def dropUnfinishedData(self):
"""Drop any unfinished data from a database."""
raise NotImplementedError('this method must be overridden')
def storeTransaction(self, tid, object_list, transaction): def storeTransaction(self, tid, object_list, transaction):
"""Store a transaction temporarily. Note that this transaction """Store a transaction temporarily. Note that this transaction
is not finished yet. The list of objects contains tuples, is not finished yet. The list of objects contains tuples,
......
...@@ -332,7 +332,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -332,7 +332,7 @@ class MySQLDatabaseManager(DatabaseManager):
for offset, uuid, state in cell_list: for offset, uuid, state in cell_list:
uuid = e(uuid) uuid = e(uuid)
if state == DISCARDED_STATE: if state == DISCARDED_STATE:
q("""DELETE FROM pt WHERE offset = %d AND uuid = '%s'""" \ q("""DELETE FROM pt WHERE rid = %d AND uuid = '%s'""" \
% (offset, uuid)) % (offset, uuid))
else: else:
q("""INSERT INTO pt VALUES (%d, '%s', %d) q("""INSERT INTO pt VALUES (%d, '%s', %d)
...@@ -351,6 +351,17 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -351,6 +351,17 @@ class MySQLDatabaseManager(DatabaseManager):
def setPartitionTable(self, ptid, cell_list): def setPartitionTable(self, ptid, cell_list):
self.doSetPartitionTable(ptid, cell_list, False) self.doSetPartitionTable(ptid, cell_list, False)
def dropUnfinishedData(self):
q = self.query
self.begin()
try:
q("""TRUNCATE tobj""")
q("""TRUNCATE ttrans""")
except:
self.rollback()
raise
self.commit()
def storeTransaction(self, tid, object_list, transaction): def storeTransaction(self, tid, object_list, transaction):
q = self.query q = self.query
e = self.escape e = self.escape
......
...@@ -123,8 +123,11 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -123,8 +123,11 @@ class VerificationEventHandler(StorageEventHandler):
try: try:
for offset in offset_list: for offset in offset_list:
row = [] row = []
for cell in app.pt.getCellList(offset): try:
row.append((cell.getUUID(), cell.getState())) for cell in app.pt.getCellList(offset):
row.append((cell.getUUID(), cell.getState()))
except TypeError:
pass
row_list.append((offset, row)) row_list.append((offset, row))
except IndexError: except IndexError:
p = Packet() p = Packet()
...@@ -155,7 +158,7 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -155,7 +158,7 @@ class VerificationEventHandler(StorageEventHandler):
node = nm.getNodeByUUID(uuid) node = nm.getNodeByUUID(uuid)
if node is None: if node is None:
node = StorageNode(uuid = uuid) node = StorageNode(uuid = uuid)
if uuid != self.uuid: if uuid != app.uuid:
node.setState(TEMPORARILY_DOWN_STATE) node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node) nm.add(node)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment