Commit 47729240 authored by Vincent Pelletier's avatar Vincent Pelletier

Wait for storage to tell us it is ready before asking it to lock objects.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2366 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 449010db
......@@ -369,6 +369,9 @@ class EventHandler(object):
oid_checksum, max_oid, serial_checksum, max_serial):
raise UnexpectedPacketError
def notifyReady(self, conn):
raise UnexpectedPacketError
# Error packet handlers.
def error(self, conn, code, message):
......@@ -488,6 +491,7 @@ class EventHandler(object):
d[Packets.AnswerCheckTIDRange] = self.answerCheckTIDRange
d[Packets.AskCheckSerialRange] = self.askCheckSerialRange
d[Packets.AnswerCheckSerialRange] = self.answerCheckSerialRange
d[Packets.NotifyReady] = self.notifyReady
return d
......
......@@ -55,6 +55,8 @@ class Application(object):
self.name = config.getCluster()
self.server = config.getBind()
self.storage_readiness = set()
for address in config.getMasters():
self.nm.createMaster(address=address)
......@@ -553,3 +555,12 @@ class Application(object):
logging.info('Accept a storage %s (%s)' % (dump(uuid), state))
return (uuid, node, state, handler, node_ctor)
def setStorageNotReady(self, uuid):
self.storage_readiness.discard(uuid)
def setStorageReady(self, uuid):
self.storage_readiness.add(uuid)
def isStorageReady(self, uuid):
return uuid in self.storage_readiness
......@@ -68,9 +68,12 @@ class ClientServiceHandler(MasterHandler):
# Collect the UUIDs of nodes related to this transaction.
uuid_set = set()
isStorageReady = app.isStorageReady
for part in partition_set:
uuid_set.update((cell.getUUID() for cell in app.pt.getCellList(part)
if cell.getNodeState() != NodeStates.HIDDEN))
uuid_set.update((uuid for uuid in (
cell.getUUID() for cell in app.pt.getCellList(part)
if cell.getNodeState() != NodeStates.HIDDEN)
if isStorageReady(uuid)))
# check if greater and foreign OID was stored
if self.app.tm.updateLastOID(oid_list):
......
......@@ -30,8 +30,12 @@ class StorageServiceHandler(BaseServiceHandler):
def connectionCompleted(self, conn):
# TODO: unit test
node = self.app.nm.getByUUID(conn.getUUID())
app = self.app
uuid = conn.getUUID()
node = app.nm.getByUUID(uuid)
# XXX: what other values could happen ?
if node.isRunning():
app.setStorageNotReady(uuid)
conn.notify(Packets.StartOperation())
def nodeLost(self, conn, node):
......@@ -141,3 +145,6 @@ class StorageServiceHandler(BaseServiceHandler):
except ConnectorConnectionClosedException:
pass
def notifyReady(self, conn):
self.app.setStorageReady(conn.getUUID())
......@@ -1729,6 +1729,13 @@ class AnswerCheckSerialRange(Packet):
# serial_checksum, max_serial
return unpack(self._header_format, body)
class NotifyReady(Packet):
"""
Notify that node is ready to serve requests.
S -> M
"""
pass
class Error(Packet):
"""
Error is a special type of message, because this can be sent against
......@@ -1978,6 +1985,7 @@ class PacketRegistry(dict):
AskCheckSerialRange,
AnswerCheckSerialRange,
)
NotifyReady = register(0x003B, NotifyReady)
# build a "singleton"
Packets = PacketRegistry()
......
......@@ -272,6 +272,7 @@ class Application(object):
or not self.has_last_ids:
self.em.poll(1)
self.ready = True
self.master_conn.notify(Packets.NotifyReady())
def doOperation(self):
"""Handle everything, including replications and transactions."""
......
......@@ -109,6 +109,13 @@ class MasterClientHandlerTests(NeoTestBase):
tid = self.app.tm.getLastTID()
conn = self.getFakeConnection(client_uuid, self.client_address)
self.app.nm.getByUUID(storage_uuid).setConnection(storage_conn)
# No packet sent if storage node is not ready
self.assertFalse(self.app.isStorageReady(storage_uuid))
service.askFinishTransaction(conn, tid, oid_list)
self.checkNoPacketSent(storage_conn)
# ...but AskLockInformation is sent if it is ready
self.app.setStorageReady(storage_uuid)
self.assertTrue(self.app.isStorageReady(storage_uuid))
service.askFinishTransaction(conn, tid, oid_list)
self.checkAskLockInformation(storage_conn)
self.assertEquals(len(self.app.tm.getPendingList()), 1)
......
......@@ -93,6 +93,24 @@ class MasterAppTests(NeoTestBase):
self.checkNoPacketSent(master_conn)
self.checkNotifyNodeInformation(storage_conn)
def test_storageReadinessAPI(self):
uuid_1 = self.getNewUUID()
uuid_2 = self.getNewUUID()
self.assertFalse(self.app.isStorageReady(uuid_1))
self.assertFalse(self.app.isStorageReady(uuid_2))
# Must not raise, nor change readiness
self.app.setStorageNotReady(uuid_1)
self.assertFalse(self.app.isStorageReady(uuid_1))
self.assertFalse(self.app.isStorageReady(uuid_2))
# Mark as ready, only one must change
self.app.setStorageReady(uuid_1)
self.assertTrue(self.app.isStorageReady(uuid_1))
self.assertFalse(self.app.isStorageReady(uuid_2))
self.app.setStorageReady(uuid_2)
# Mark not ready, only one must change
self.app.setStorageNotReady(uuid_1)
self.assertFalse(self.app.isStorageReady(uuid_1))
self.assertTrue(self.app.isStorageReady(uuid_2))
if __name__ == '__main__':
unittest.main()
......
......@@ -272,6 +272,13 @@ class MasterStorageHandlerTests(NeoTestBase):
self.assertTrue(status)
self.assertEqual(self.app.packing, None)
def test_notifyReady(self):
node, conn = self._getStorage()
uuid = node.getUUID()
self.assertFalse(self.app.isStorageReady(uuid))
self.service.notifyReady(conn)
self.assertTrue(self.app.isStorageReady(uuid))
if __name__ == '__main__':
unittest.main()
......@@ -702,6 +702,10 @@ class ProtocolTests(NeoTestBase):
pstatus = p.decode()[0]
self.assertEqual(pstatus, status)
def test_notifyReady(self):
p = Packets.NotifyReady()
self.assertEqual(tuple(), p.decode())
if __name__ == '__main__':
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment