Commit 8296c3c7 authored by Yoshinori Okuji's avatar Yoshinori Okuji

A lot of bugfixes.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@222 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 440d0981
import logging
import os
from threading import Lock, local
from threading import Lock, RLock, local
from cPickle import dumps, loads
from zlib import compress, decompress
from Queue import Queue, Empty
from random import shuffle
from time import sleep
from neo.client.mq import MQ
from neo.node import NodeManager, MasterNode, StorageNode
from neo.connection import MTClientConnection
from neo.protocol import Packet, INVALID_UUID, INVALID_TID, INVALID_PARTITION, \
STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
TEMPORARILY_DOWN_STATE, \
RUNNING_STATE, TEMPORARILY_DOWN_STATE, \
UP_TO_DATE_STATE, FEEDING_STATE, INVALID_SERIAL
from neo.client.handler import ClientEventHandler
from neo.client.Storage import NEOStorageError, NEOStorageConflictError, \
......@@ -31,7 +32,7 @@ class ConnectionPool(object):
# Define a lock in order to create one connection to
# a storage node at a time to avoid multiple connections
# to the same node.
l = Lock()
l = RLock()
self.connection_lock_acquire = l.acquire
self.connection_lock_release = l.release
......@@ -40,41 +41,50 @@ class ConnectionPool(object):
addr = node.getNode().getServer()
if addr is None:
return None
handler = ClientEventHandler(self.app, self.app.dispatcher)
conn = MTClientConnection(self.app.em, handler, addr)
conn.lock()
try:
msg_id = conn.getNextId()
p = Packet()
p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE,
self.app.uuid, addr[0],
addr[1], self.app.name)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.app.dispatcher.register(conn, msg_id, self.app.getQueue())
self.app.local_var.storage_node = None
finally:
conn.unlock()
self.app._waitMessage(conn, msg_id)
if self.app.storage_node == -1:
# Connection failed, notify primary master node
logging.error('Connection to storage node %s failed' %(addr,))
conn = self.app.master_conn
if node.getState() != RUNNING_STATE:
return None
app = self.app
handler = ClientEventHandler(app, app.dispatcher)
# Loop until a connection is obtained.
while 1:
logging.info('trying to connect to %s:%d', *addr)
app.local_var.node_not_ready = 0
conn = MTClientConnection(app.em, handler, addr)
conn.lock()
try:
if conn.getSocket() is None:
# This happens, if a connection could not be established.
logging.error('Connection to storage node %s failed', addr)
return None
msg_id = conn.getNextId()
p = Packet()
node_list = [(STORAGE_NODE_TYPE, addr[0], addr[1],
node.getUUID(), TEMPORARILY_DOWN_STATE),]
p.notifyNodeInformation(msg_id, node_list)
p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE,
app.uuid, addr[0],
addr[1], app.name)
conn.addPacket(p)
conn.expectMessage(msg_id)
app.dispatcher.register(conn, msg_id, app.getQueue())
finally:
conn.unlock()
return None
logging.info('connected to storage node %s:%d', *(conn.getAddress()))
return conn
try:
app._waitMessage(conn, msg_id)
except NEOStorageError:
logging.error('Connection to storage node %s failed', addr)
return None
if app.local_var.node_not_ready:
# Connection failed, notify primary master node
logging.info('Storage node %s not ready', addr)
else:
logging.info('connected to storage node %s:%d', *addr)
return conn
sleep(1)
def _dropConnections(self):
"""Drop connections."""
......@@ -98,9 +108,16 @@ class ConnectionPool(object):
if len(self.connection_dict) > self.max_pool_size:
# must drop some unused connections
self._dropConnections()
conn = self._initNodeConnection(node)
self.connection_lock_release()
try:
conn = self._initNodeConnection(node)
finally:
self.connection_lock_acquire()
if conn is None:
return None
# add node to node manager
if self.app.nm.getNodeByServer(node.getServer()) is None:
n = StorageNode(node.getServer())
......@@ -112,9 +129,9 @@ class ConnectionPool(object):
def getConnForNode(self, node):
"""Return a locked connection object to a given node
If no connection exists, create a new one"""
uuid = node.getUUID()
self.connection_lock_acquire()
try:
uuid = node.getUUID()
try:
conn = self.connection_dict[uuid]
# Already connected to node
......@@ -217,13 +234,20 @@ class Application(object):
else:
conn, packet = local_queue.get()
if packet is None:
if conn is target_conn:
raise NEOStorageError('connection closed')
else:
continue
conn.lock()
try:
conn.handler.dispatch(conn, packet)
finally:
conn.unlock()
if target_conn is conn and msg_id == packet.getId() and packet.getType() & 0x8000:
if target_conn is conn and msg_id == packet.getId() \
and packet.getType() & 0x8000:
break
def registerDB(self, db, limit):
......@@ -278,61 +302,55 @@ class Application(object):
def _load(self, oid, serial = INVALID_TID, tid = INVALID_TID, cache = 0):
"""Internal method which manage load ,loadSerial and loadBefore."""
partition_id = u64(oid) % self.num_partitions
# Only used up to date node for retrieving object
cell_list = self.pt.getCellList(partition_id, True)
data = None
if len(cell_list) == 0:
# FIXME must wait for cluster to be ready
raise NEOStorageNotFoundError()
shuffle(cell_list)
self.local_var.asked_object = -1
for cell in cell_list:
logging.debug('trying to load %s from %s',
dump(oid), dump(cell.getUUID()))
conn = self.cp.getConnForNode(cell)
if conn is None:
self.local_var.asked_object = None
while self.local_var.asked_object is None:
cell_list = self.pt.getCellList(partition_id, True)
if len(cell_list) == 0:
sleep(1)
continue
try:
msg_id = conn.getNextId()
p = Packet()
p.askObject(msg_id, oid, serial, tid)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue())
self.local_var.asked_object = 0
finally:
conn.unlock()
shuffle(cell_list)
self.local_var.asked_object = None
for cell in cell_list:
logging.debug('trying to load %s from %s',
dump(oid), dump(cell.getUUID()))
conn = self.cp.getConnForNode(cell)
if conn is None:
continue
# Wait for answer
# asked object retured value are :
# -1 : oid not found
# other : data
self._waitMessage(conn, msg_id)
if self.local_var.asked_object == -1:
# OID not found
# XXX either try with another node, either raise error here
# for now try with another node
continue
try:
msg_id = conn.getNextId()
p = Packet()
p.askObject(msg_id, oid, serial, tid)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue())
self.local_var.asked_object = 0
finally:
conn.unlock()
# Check data
noid, start_serial, end_serial, compression, checksum, data \
self._waitMessage(conn, msg_id)
if self.local_var.asked_object == -1:
# OID not found
break
# Check data
noid, start_serial, end_serial, compression, checksum, data \
= self.local_var.asked_object
if noid != oid:
# Oops, try with next node
logging.error('got wrong oid %s instead of %s from node %s' \
% (noid, oid, cell.getServer()))
continue
elif checksum != makeChecksum(data):
# Check checksum.
logging.error('wrong checksum from node %s for oid %s' \
% (cell.getServer(), oid))
continue
else:
# Everything looks alright.
break
if noid != oid:
# Oops, try with next node
logging.error('got wrong oid %s instead of %s from node %s',
noid, oid, cell.getServer())
continue
elif checksum != makeChecksum(data):
# Check checksum.
logging.error('wrong checksum from node %s for oid %s',
cell.getServer(), oid)
continue
else:
# Everything looks alright.
break
if self.local_var.asked_object == -1:
# We didn't got any object from all storage node
......@@ -488,7 +506,7 @@ class Application(object):
oid_list = self.txn_data_dict.keys()
# Store data on each node
partition_id = u64(self.tid) % self.num_partitions
cell_list = self.pt.getCellList(partition_id, True)
cell_list = self.pt.getCellList(partition_id, False)
for cell in cell_list:
conn = self.cp.getConnForNode(cell)
if conn is None:
......@@ -527,7 +545,7 @@ class Application(object):
aborted_node_set = set()
for oid in self.txn_data_dict.iterkeys():
partition_id = u64(oid) % self.num_partitions
cell_list = self.pt.getCellList(partition_id, True)
cell_list = self.pt.getCellList(partition_id)
for cell in cell_list:
if cell.getNode() not in aborted_node_set:
conn = self.cp.getConnForNode(cell)
......@@ -546,7 +564,7 @@ class Application(object):
# Abort in nodes where transaction was stored
partition_id = u64(self.tid) % self.num_partitions
cell_list = self.pt.getCellList(partition_id, True)
cell_list = self.pt.getCellList(partition_id)
for cell in cell_list:
if cell.getNode() not in aborted_node_set:
conn = self.cp.getConnForNode(cell)
......
......@@ -66,7 +66,7 @@ class Dispatcher(Thread):
if app.pt is not None and app.pt.operational():
# Connected to primary master node and got all informations
break
app.node_not_ready = 0
app.local_var.node_not_ready = 0
if app.primary_master_node is None:
# Try with master node defined in config
addr, port = app.master_node_list[master_index].split(':')
......@@ -110,7 +110,7 @@ class Dispatcher(Thread):
elif app.primary_master_node.getServer() != (addr, port):
# Master node changed, connect to new one
break
elif app.node_not_ready:
elif app.local_var.node_not_ready:
# Wait a bit and reask again
break
elif app.pt is not None and app.pt.operational():
......
......@@ -45,6 +45,36 @@ class ClientEventHandler(EventHandler):
# put message in request queue
dispatcher._request_queue.put((conn, packet))
def _dealWithStorageFailure(self, conn, node, state):
app = self.app
# Remove from pool connection
app.cp.removeConnection(node)
# Put fake packets to task queues.
queue_set = set()
for key in self.dispatcher.message_table.keys():
if id(conn) == key[0]:
queue = self.dispatcher.message_table.pop(key)
queue_set.add(queue)
for queue in queue_set:
queue.put((conn, None))
# Notify the primary master node of the failure.
conn = app.master_conn
if conn is not None:
conn.lock()
try:
msg_id = conn.getNextId()
p = Packet()
ip_address, port = node.getServer()
node_list = [(STORAGE_NODE_TYPE, ip_address, port,
node.getUUID(), state)]
p.notifyNodeInformation(msg_id, node_list)
conn.addPacket(p)
finally:
conn.unlock()
def connectionFailed(self, conn):
app = self.app
uuid = conn.getUUID()
......@@ -59,15 +89,16 @@ class ClientEventHandler(EventHandler):
self.dispatcher.connectToPrimaryMasterNode(app)
else:
# Connection to a storage node failed
app.storage_node = -1
node = app.nm.getNodeByServer(conn.getAddress())
if isinstance(node, StorageNode):
self._dealWithStorageFailure(conn, node, TEMPORARILY_DOWN_STATE)
EventHandler.connectionFailed(self, conn)
def connectionClosed(self, conn):
uuid = conn.getUUID()
app = self.app
if app.master_conn is None:
EventHandler.connectionClosed(self, conn)
elif uuid == app.master_conn.getUUID():
if app.master_conn is not None and uuid == app.master_conn.getUUID():
logging.critical("connection to primary master node closed")
# Close connection
app.master_conn.close()
......@@ -76,29 +107,14 @@ class ClientEventHandler(EventHandler):
logging.critical("trying reconnection to master node...")
self.dispatcher.connectToPrimaryMasterNode(app)
else:
app = self.app
node = app.nm.getNodeByUUID(uuid)
if node is not None:
logging.info("connection to storage node %s closed",
node.getServer())
node = app.nm.getNodeByServer(conn.getAddress())
if isinstance(node, StorageNode):
# Notify primary master node that a storage node is temporarily down
conn = app.master_conn
if conn is not None:
conn.lock()
try:
msg_id = conn.getNextId()
p = Packet()
ip_address, port = node.getServer()
node_list = [(STORAGE_NODE_TYPE, ip_address, port, node.getUUID(),
TEMPORARILY_DOWN_STATE),]
p.notifyNodeInformation(msg_id, node_list)
conn.addPacket(p)
finally:
conn.unlock()
# Remove from pool connection
app.cp.removeConnection(node)
EventHandler.connectionClosed(self, conn)
logging.info("connection to storage node %s closed",
node.getServer())
self._dealWithStorageFailure(conn, node, TEMPORARILY_DOWN_STATE)
EventHandler.connectionClosed(self, conn)
def timeoutExpired(self, conn):
uuid = conn.getUUID()
......@@ -109,24 +125,12 @@ class ClientEventHandler(EventHandler):
logging.critical("trying reconnection to master node...")
self.dispatcher.connectToPrimaryMasterNode(app)
else:
node = app.nm.getNodeByUUID(uuid)
node = app.nm.getNodeByServer(conn.getAddress())
if isinstance(node, StorageNode):
# Notify primary master node that a storage node is temporarily down
conn = app.master_conn
if conn is not None:
conn.lock()
try:
msg_id = conn.getNextId()
p = Packet()
ip_address, port = node.getServer()
node_list = [(STORAGE_NODE_TYPE, ip_address, port, node.getUUID(),
TEMPORARILY_DOWN_STATE),]
p.notifyNodeInformation(msg_id, node_list)
conn.addPacket(p)
finally:
conn.unlock()
# Remove from pool connection
app.cp.removeConnection(node)
# Notify primary master node that a storage node is
# temporarily down.
self._dealWithStorageFailure(conn, node, TEMPORARILY_DOWN_STATE)
EventHandler.timeoutExpired(self, conn)
def peerBroken(self, conn):
......@@ -138,31 +142,17 @@ class ClientEventHandler(EventHandler):
logging.critical("trying reconnection to master node...")
self.dispatcher.connectToPrimaryMasterNode(app)
else:
node = app.nm.getNodeByUUID(uuid)
node = app.nm.getNodeByServer(conn.getAddress())
if isinstance(node, StorageNode):
# Notify primary master node that a storage node is broken
conn = app.master_conn
if conn is not None:
conn.lock()
try:
msg_id = conn.getNextId()
p = Packet()
ip_address, port = node.getServer()
node_list = [(STORAGE_NODE_TYPE, ip_address, port, node.getUUID(),
BROKEN_STATE),]
p.notifyNodeInformation(msg_id, node_list)
conn.addPacket(p)
finally:
conn.unlock()
# Remove from pool connection
app.cp.removeConnection(node)
self._dealWithStorageFailure(conn, node, BROKEN_STATE)
EventHandler.peerBroken(self, conn)
def handleNotReady(self, conn, packet, message):
if isinstance(conn, MTClientConnection):
app = self.app
app.node_not_ready = 1
app.local_var.node_not_ready = 1
else:
self.handleUnexpectedPacket(conn, packet)
......
......@@ -167,12 +167,15 @@ class EpollEventManager(object):
def poll(self, timeout = 1):
rlist, wlist = self.epoll.poll(timeout)
for fd in rlist:
conn = self.connection_dict[fd]
conn.lock()
try:
conn.readable()
finally:
conn.unlock()
conn = self.connection_dict[fd]
conn.lock()
try:
conn.readable()
finally:
conn.unlock()
except KeyError:
pass
for fd in wlist:
# This can fail, if a connection is closed in readable().
......
from time import time
import logging
from neo.protocol import RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE
from neo.util import dump
class Node(object):
"""This class represents a node."""
......
......@@ -221,10 +221,6 @@ class Application(object):
for conn in em.getConnectionList():
conn.setHandler(handler)
# Forget all client nodes.
for node in nm.getClientNodeList():
nm.remove(node)
# Forget all unfinished data.
self.dm.dropUnfinishedData()
......
......@@ -99,6 +99,7 @@ class StorageEventHandler(EventHandler):
self.handleUnexpectedPacket(conn, packet)
return
logging.debug('handleNotifyNodeInformation: node_list = %r', node_list)
app = self.app
node = app.nm.getNodeByUUID(uuid)
if not isinstance(node, MasterNode) \
......@@ -142,12 +143,15 @@ class StorageEventHandler(EventHandler):
if state == RUNNING_STATE:
n = app.nm.getNodeByUUID(uuid)
if n is None:
logging.debug('adding client node %s', dump(uuid))
n = ClientNode(uuid = uuid)
app.nm.add(n)
assert app.nm.getNodeByUUID(uuid) is n
else:
self.dealWithClientFailure(uuid)
n = app.nm.getNodeByUUID(uuid)
if n is not None:
logging.debug('removing client node %s', dump(uuid))
app.nm.remove(n)
def handleAskLastIDs(self, conn, packet):
......
......@@ -137,7 +137,7 @@ class OperationEventHandler(StorageEventHandler):
else:
# If I do not know such a node, and it is not even a master
# node, simply reject it.
logging.error('reject an unknown node')
logging.error('reject an unknown node %s', dump(uuid))
conn.addPacket(Packet().notReady(packet.getId(),
'unknown node'))
conn.abort()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment