Commit c96029cd authored by Julien Muchembled's avatar Julien Muchembled

Fix random failures in threading tests when big packets are transmitted

parent c154e45a
...@@ -30,7 +30,7 @@ from neo.client import Storage ...@@ -30,7 +30,7 @@ from neo.client import Storage
from neo.lib import bootstrap, setupLog from neo.lib import bootstrap, setupLog
from neo.lib.connection import BaseConnection, Connection from neo.lib.connection import BaseConnection, Connection
from neo.lib.connector import SocketConnector, \ from neo.lib.connector import SocketConnector, \
ConnectorConnectionRefusedException ConnectorConnectionRefusedException, ConnectorTryAgainException
from neo.lib.event import EventManager from neo.lib.event import EventManager
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, NodeTypes from neo.lib.protocol import CellStates, ClusterStates, NodeStates, NodeTypes
from neo.lib.util import SOCKET_CONNECTORS_DICT, parseMasterList from neo.lib.util import SOCKET_CONNECTORS_DICT, parseMasterList
...@@ -471,6 +471,7 @@ class NEOCluster(object): ...@@ -471,6 +471,7 @@ class NEOCluster(object):
SocketConnector.makeClientConnection) SocketConnector.makeClientConnection)
SocketConnector_makeListeningConnection = staticmethod( SocketConnector_makeListeningConnection = staticmethod(
SocketConnector.makeListeningConnection) SocketConnector.makeListeningConnection)
SocketConnector_receive = staticmethod(SocketConnector.receive)
SocketConnector_send = staticmethod(SocketConnector.send) SocketConnector_send = staticmethod(SocketConnector.send)
Storage__init__ = staticmethod(Storage.__init__) Storage__init__ = staticmethod(Storage.__init__)
_patch_count = 0 _patch_count = 0
...@@ -500,6 +501,20 @@ class NEOCluster(object): ...@@ -500,6 +501,20 @@ class NEOCluster(object):
if type(Serialized.pending) is not frozenset: if type(Serialized.pending) is not frozenset:
Serialized.pending = 1 Serialized.pending = 1
return result return result
def receive(self):
# If the peer sent an entire packet, make sure we read it entirely,
# otherwise Serialize.pending would be reset to 0.
data = ''
try:
while True:
d = cls.SocketConnector_receive(self)
if not d:
return data
data += d
except ConnectorTryAgainException:
if data:
return data
raise
# TODO: 'sleep' should 'tic' in a smart way, so that storages can be # TODO: 'sleep' should 'tic' in a smart way, so that storages can be
# safely started even if the cluster isn't. # safely started even if the cluster isn't.
bootstrap.sleep = lambda seconds: None bootstrap.sleep = lambda seconds: None
...@@ -507,6 +522,7 @@ class NEOCluster(object): ...@@ -507,6 +522,7 @@ class NEOCluster(object):
SocketConnector.makeClientConnection = makeClientConnection SocketConnector.makeClientConnection = makeClientConnection
SocketConnector.makeListeningConnection = lambda self, addr: \ SocketConnector.makeListeningConnection = lambda self, addr: \
cls.SocketConnector_makeListeningConnection(self, BIND) cls.SocketConnector_makeListeningConnection(self, BIND)
SocketConnector.receive = receive
SocketConnector.send = send SocketConnector.send = send
Storage.setupLog = lambda *args, **kw: None Storage.setupLog = lambda *args, **kw: None
Serialized.init() Serialized.init()
...@@ -524,6 +540,7 @@ class NEOCluster(object): ...@@ -524,6 +540,7 @@ class NEOCluster(object):
cls.SocketConnector_makeClientConnection cls.SocketConnector_makeClientConnection
SocketConnector.makeListeningConnection = \ SocketConnector.makeListeningConnection = \
cls.SocketConnector_makeListeningConnection cls.SocketConnector_makeListeningConnection
SocketConnector.receive = cls.SocketConnector_receive
SocketConnector.send = cls.SocketConnector_send SocketConnector.send = cls.SocketConnector_send
Storage.setupLog = setupLog Storage.setupLog = setupLog
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment