Commit a89322a4 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Fix a serious bug in send. Increase a timeout for Finish Transaction. Fix undefined symbols.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@155 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 8cf80640
from threading import Thread from threading import Thread
from Queue import Empty, Queue from Queue import Empty, Queue
from neo.protocol import PING, Packet, CLIENT_NODE_TYPE from neo.protocol import PING, Packet, CLIENT_NODE_TYPE, FINISH_TRANSACTION
from neo.connection import ClientConnection from neo.connection import ClientConnection
from neo.node import MasterNode from neo.node import MasterNode
...@@ -62,9 +62,16 @@ class Dispatcher(Thread): ...@@ -62,9 +62,16 @@ class Dispatcher(Thread):
conn.addPacket(p) conn.addPacket(p)
if tmp_q is not None: if tmp_q is not None:
# We expect an answer # We expect an answer
key = "%s-%s" %(conn.getUUID(),msg_id) key = "%s-%s" %(conn.getUUID(), msg_id)
self.message_table[key] = tmp_q self.message_table[key] = tmp_q
conn.expectMessage(msg_id) # XXX this is a hack. Probably queued tasks themselves
# should specify the timeout values.
if p.getType() == FINISH_TRANSACTION:
# Finish Transaction may take a lot of time when
# many objects are committed at a time.
conn.expectMessage(msg_id, additional_timeout = 300)
else:
conn.expectMessage(msg_id)
except Empty: except Empty:
continue continue
......
...@@ -12,6 +12,7 @@ from neo.exception import ElectionFailure ...@@ -12,6 +12,7 @@ from neo.exception import ElectionFailure
from neo.util import dump from neo.util import dump
from ZODB.TimeStamp import TimeStamp from ZODB.TimeStamp import TimeStamp
from ZODB.utils import p64
class ClientEventHandler(EventHandler): class ClientEventHandler(EventHandler):
...@@ -78,13 +79,13 @@ class ClientEventHandler(EventHandler): ...@@ -78,13 +79,13 @@ class ClientEventHandler(EventHandler):
def timeoutExpired(self, conn): def timeoutExpired(self, conn):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid == self.app.primary_master_node.getUUID(): app = self.app
if uuid == app.primary_master_node.getUUID():
logging.critical("connection timeout to primary master node expired") logging.critical("connection timeout to primary master node expired")
if self.dispatcher.connecting_to_master_node == 0: if self.dispatcher.connecting_to_master_node == 0:
logging.critical("trying reconnection to master node...") logging.critical("trying reconnection to master node...")
self.dispatcher.connectToPrimaryMasterNode(app) self.dispatcher.connectToPrimaryMasterNode(app)
else: else:
app = self.app
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if isinstance(node, StorageNode): if isinstance(node, StorageNode):
# Notify primary master node that a storage node is temporarily down # Notify primary master node that a storage node is temporarily down
...@@ -102,13 +103,13 @@ class ClientEventHandler(EventHandler): ...@@ -102,13 +103,13 @@ class ClientEventHandler(EventHandler):
def peerBroken(self, conn): def peerBroken(self, conn):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid == self.app.primary_master_node.getUUID(): app = self.app
if uuid == app.primary_master_node.getUUID():
logging.critical("primary master node is broken") logging.critical("primary master node is broken")
if self.dispatcher.connecting_to_master_node == 0: if self.dispatcher.connecting_to_master_node == 0:
logging.critical("trying reconnection to master node...") logging.critical("trying reconnection to master node...")
self.dispatcher.connectToPrimaryMasterNode(app) self.dispatcher.connectToPrimaryMasterNode(app)
else: else:
app = self.app
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if isinstance(node, StorageNode): if isinstance(node, StorageNode):
# Notify primary master node that a storage node is broken # Notify primary master node that a storage node is broken
......
...@@ -216,7 +216,7 @@ class Connection(BaseConnection): ...@@ -216,7 +216,7 @@ class Connection(BaseConnection):
elif r == len(msg): elif r == len(msg):
del self.write_buf[:] del self.write_buf[:]
else: else:
self.write_buf = [msg[:r]] self.write_buf = [msg[r:]]
except socket.error, m: except socket.error, m:
if m[0] == errno.EAGAIN: if m[0] == errno.EAGAIN:
return return
......
...@@ -254,8 +254,9 @@ class Packet(object): ...@@ -254,8 +254,9 @@ class Packet(object):
# Encoders. # Encoders.
def encode(self): def encode(self):
msg = pack('!LHL', self._id, self._type, 10 + len(self._body)) + self._body msg = pack('!LHL', self._id, self._type, 10 + len(self._body)) + self._body
logging.debug('encoding %d:%x', self._id, self._type)
if len(msg) > MAX_PACKET_SIZE: if len(msg) > MAX_PACKET_SIZE:
raise ProtocolError(self, 'message too big (%d)' % len(msg)) raise RuntimeError('message too big (%d)' % len(msg))
return msg return msg
__str__ = encode __str__ = encode
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment