Commit bf56688c authored by Yoshinori Okuji's avatar Yoshinori Okuji

Fix many bugs. Now storage node and master storage seem to be able to operate.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@74 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 2272593b
......@@ -286,7 +286,7 @@ class Application(object):
def broadcastPartitionChanges(self, ptid, cell_list):
"""Broadcast a Notify Partition Changes packet."""
for c in em.getConnectionList():
for c in self.em.getConnectionList():
if c.getUUID() is not None:
n = self.nm.getNodeByUUID(c.getUUID())
if isinstance(n, (ClientNode, StorageNode)):
......@@ -296,7 +296,8 @@ class Application(object):
while size:
amt = min(10000, size)
p = Packet()
p.notifyPartitionChanges(ptid, cell_list[start:start+amt])
p.notifyPartitionChanges(c.getNextId(), ptid,
cell_list[start:start+amt])
c.addPacket(p)
size -= amt
start += amt
......@@ -349,14 +350,14 @@ class Application(object):
# Now I have at least one to ask.
prev_lptid = self.lptid
node = nm.getNodeByUUID(uuid)
node = nm.getNodeByUUID(self.target_uuid)
if node.getState() != RUNNING_STATE:
# Weird. It's dead.
logging.info('the target storage node is dead')
continue
for conn in em.getConnectionList():
if conn.getUUID() == self.lptid:
if conn.getUUID() == self.target_uuid:
break
else:
# Why?
......@@ -487,7 +488,7 @@ class Application(object):
"""Verify the data in storage nodes and clean them up, if necessary."""
logging.info('start to verify data')
handler = VerificationEventHandler()
handler = VerificationEventHandler(self)
em = self.em
nm = self.nm
......@@ -512,11 +513,13 @@ class Application(object):
for offset in xrange(self.num_partitions):
row_list.append((offset, self.pt.getRow(offset)))
if len(row_list) == 1000:
p.sendPartitionTable(self.lptid, row_list)
p.sendPartitionTable(conn.getNextId(),
self.lptid, row_list)
conn.addPacket(p)
del row_list[:]
if len(row_list) != 0:
p.sendPartitionTable(conn.getNextId(), self.lptid, row_list)
p.sendPartitionTable(conn.getNextId(),
self.lptid, row_list)
conn.addPacket(p)
# Gather all unfinished transactions.
......@@ -605,7 +608,7 @@ class Application(object):
a shutdown."""
logging.info('provide service')
handler = ServiceEventHandler()
handler = ServiceEventHandler(self)
em = self.em
nm = self.nm
......
......@@ -6,6 +6,7 @@ from neo.master.handler import MasterEventHandler
from neo.protocol import Packet, INVALID_UUID
from neo.exception import OperationFailure
from neo.util import dump
from neo.node import ClientNode, StorageNode, MasterNode
class FinishingTransaction(object):
"""This class describes a finishing transaction."""
......
......@@ -6,6 +6,7 @@ from neo.master.handler import MasterEventHandler
from neo.exception import VerificationFailure
from neo.protocol import Packet, INVALID_UUID
from neo.util import dump
from neo.node import ClientNode, StorageNode, MasterNode
class VerificationEventHandler(MasterEventHandler):
"""This class deals with events for a verification phase."""
......
......@@ -738,10 +738,10 @@ class Packet(object):
def _decodeAskPartitionTable(self):
try:
n = unpack('!L', self._body[:4])
n = unpack('!L', self._body[:4])[0]
offset_list = []
for i in xrange(n):
offset = unpack('!L', self._body[4+i*4:8+i*4])
offset = unpack('!L', self._body[4+i*4:8+i*4])[0]
offset_list.append(offset)
except:
raise ProtocolError(self, 'invalid ask partition table')
......@@ -813,10 +813,10 @@ class Packet(object):
def _decodeAnswerUnfinishedTransactions(self):
try:
n = unpack('!L', self._body[:4])
n = unpack('!L', self._body[:4])[0]
tid_list = []
for i in xrange(n):
tid = unpack('8s', self._body[4+i*8:12+i*8])
tid = unpack('8s', self._body[4+i*8:12+i*8])[0]
tid_list.append(tid)
except:
raise ProtocolError(self, 'invalid answer unfinished transactions')
......@@ -841,7 +841,7 @@ class Packet(object):
def _decodeDeleteTransaction(self):
try:
tid = unpack('8s', self._body)
tid = unpack('8s', self._body)[0]
except:
raise ProtocolError(self, 'invalid delete transaction')
return (tid,)
......@@ -849,7 +849,7 @@ class Packet(object):
def _decodeCommitTransaction(self):
try:
tid = unpack('8s', self._body)
tid = unpack('8s', self._body)[0]
except:
raise ProtocolError(self, 'invalid commit transaction')
return (tid,)
......@@ -861,7 +861,7 @@ class Packet(object):
def _decodeAnswerNewTID(self):
try:
tid = unpack('8s', self._body)
tid = unpack('8s', self._body)[0]
except:
raise ProtocolError(self, 'invalid answer new tid')
return (tid,)
......@@ -869,7 +869,7 @@ class Packet(object):
def _decodeAskNewOIDs(self):
try:
num_oids = unpack('!H', self._body)
num_oids = unpack('!H', self._body)[0]
except:
raise ProtocolError(self, 'invalid ask new oids')
return (num_oids,)
......@@ -877,10 +877,10 @@ class Packet(object):
def _decodeAnswerNewOIDs(self):
try:
n = unpack('!H', self._body[:2])
n = unpack('!H', self._body[:2])[0]
oid_list = []
for i in xrange(n):
oid = unpack('8s', self._body[2+i*8:10+i*8])
oid = unpack('8s', self._body[2+i*8:10+i*8])[0]
oid_list.append(oid)
except:
raise ProtocolError(self, 'invalid answer new oids')
......@@ -892,7 +892,7 @@ class Packet(object):
tid, n = unpack('!8sL', self._body[:12])
oid_list = []
for i in xrange(n):
oid = unpack('8s', self._body[12+i*8:20+i*8])
oid = unpack('8s', self._body[12+i*8:20+i*8])[0]
oid_list.append(oid)
except:
raise ProtocolError(self, 'invalid finish transaction')
......@@ -901,7 +901,7 @@ class Packet(object):
def _decodeNotifyTransactionFinished(self):
try:
tid = unpack('8s', self._body)
tid = unpack('8s', self._body)[0]
except:
raise ProtocolError(self, 'invalid notify transactin finished')
return (tid,)
......@@ -909,7 +909,7 @@ class Packet(object):
def _decodeLockInformation(self):
try:
tid = unpack('8s', self._body)
tid = unpack('8s', self._body)[0]
except:
raise ProtocolError(self, 'invalid lock information')
return (tid,)
......@@ -917,7 +917,7 @@ class Packet(object):
def _decodeNotifyInformationLocked(self):
try:
tid = unpack('8s', self._body)
tid = unpack('8s', self._body)[0]
except:
raise ProtocolError(self, 'invalid notify information locked')
return (tid,)
......@@ -925,10 +925,10 @@ class Packet(object):
def _decodeInvalidateObjects(self):
try:
n = unpack('!L', self._body[:4])
n = unpack('!L', self._body[:4])[0]
oid_list = []
for i in xrange(n):
oid = unpack('8s', self._body[4+i*8:12+i*8])
oid = unpack('8s', self._body[4+i*8:12+i*8])[0]
oid_list.append(oid)
except:
raise ProtocolError(self, 'invalid finish transaction')
......@@ -937,7 +937,7 @@ class Packet(object):
def _decodeUnlockInformation(self):
try:
tid = unpack('8s', self._body)
tid = unpack('8s', self._body)[0]
except:
raise ProtocolError(self, 'invalid unlock information')
return tid
......@@ -945,7 +945,7 @@ class Packet(object):
def _decodeAbortTransaction(self):
try:
tid = unpack('8s', self._body)
tid = unpack('8s', self._body)[0]
except:
raise ProtocolError(self, 'invalid abort transaction')
return (tid,)
......@@ -976,15 +976,15 @@ class Packet(object):
tid, oid_len, user_len, desc_len, ext_len \
= unpack('!8sLHHH', self._body[:18])
offset = 18
user = unpack('8s', self._body[offset:offset+user_len])
user = self._body[offset:offset+user_len]
offset += user_len
desc = unpack('8s', self._body[offset:offset+desc_len])
desc = self._body[offset:offset+desc_len]
offset += desc_len
ext = unpack('8s', self._body[offset:offset+ext_len])
ext = self._body[offset:offset+ext_len]
offset += ext_len
oid_list = []
for i in xrange(oid_len):
oid = unpack('8s', self._body[offset:offset+8])
oid = unpack('8s', self._body[offset:offset+8])[0]
offset += 8
oid_list.append(oid)
except:
......@@ -994,7 +994,7 @@ class Packet(object):
def _decodeAnswerStoreTransaction(self):
try:
tid = unpack('8s', self._body)
tid = unpack('8s', self._body)[0]
except:
raise ProtocolError(self, 'invalid answer store transaction')
return (tid,)
......@@ -1030,10 +1030,10 @@ class Packet(object):
def _decodeAnswerTIDs(self):
try:
n = unpack('!L', self._body[:4])
n = unpack('!L', self._body[:4])[0]
tid_list = []
for i in xrange(n):
tid = unpack('8s', self._body[4+i*8:12+i*8])
tid = unpack('8s', self._body[4+i*8:12+i*8])[0]
tid_list.append(tid)
except:
raise ProtocolError(self, 'invalid answer tids')
......@@ -1042,7 +1042,7 @@ class Packet(object):
def _decodeAskTransactionInformation(self):
try:
tid = unpack('8s', self._body)
tid = unpack('8s', self._body)[0]
except:
raise ProtocolError(self, 'invalid ask transaction information')
return (tid,)
......@@ -1058,10 +1058,11 @@ class Packet(object):
offset += desc_len
oid_list = []
for i in xrange(oid_len):
oid_list.append(unpack('8s', self._body[offset+i*8:offset+8+i*8]))
oid = unpack('8s', self._body[offset+i*8:offset+8+i*8])[0]
oid_list.append(oid)
except:
raise ProtocolError(self, 'invalid answer transaction information')
return tid, user, desc
return tid, user, desc, oid_list
decode_table[ANSWER_TRANSACTION_INFORMATION] = _decodeAnswerTransactionInformation
def _decodeAskObjectHistory(self):
......
import logging
from neo.protocol import UP_TO_DATE_STATE, OUT_OF_DATE_STATE, FEEDING_STATE, \
DISCARDED_STATE
DISCARDED_STATE, RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
BROKEN_STATE
class Cell(object):
"""This class represents a cell in a partition table."""
......@@ -175,7 +176,11 @@ class PartitionTable(object):
cell_list.append((offset, uuid, DISCARDED_STATE))
break
del self.count_dict[node]
try:
del self.count_dict[node]
except KeyError:
pass
return cell_list
def addNode(self, node):
......
......@@ -8,6 +8,7 @@ from neo.node import MasterNode, StorageNode, ClientNode
from neo.connection import ClientConnection
from neo.protocol import Packet
from neo.pt import PartitionTable
from neo.storage.verification import VerificationEventHandler
class BootstrapEventHandler(StorageEventHandler):
"""This class deals with events for a bootstrap phase."""
......@@ -208,7 +209,11 @@ class BootstrapEventHandler(StorageEventHandler):
app.primary_master_node = primary_node
if app.trying_master_node is primary_node:
# I am connected to the right one.
pass
logging.info('connected to a primary master node')
# This is a workaround to prevent handling of
# packets for the verification phase.
handler = VerificationEventHandler(app)
conn.setHandler(handler)
else:
app.trying_master_node = None
conn.close()
......@@ -222,6 +227,7 @@ class BootstrapEventHandler(StorageEventHandler):
conn.close()
def handleAskLastIDs(self, conn, packet):
logging.info('got ask last ids')
pass
def handleAskPartitionTable(self, conn, packet, offset_list):
......
......@@ -236,10 +236,10 @@ class MySQLDatabaseManager(DatabaseManager):
tid_set = set()
self.begin()
r = q("""SELECT tid FROM ttrans""")
tid_set.add((t[0] for t in r))
tid_set.update((t[0] for t in r))
r = q("""SELECT serial FROM tobj""")
self.commit()
tid_set.add((t[0] for t in r))
tid_set.update((t[0] for t in r))
return list(tid_set)
def objectPresent(self, oid, tid, all = True):
......
......@@ -7,6 +7,7 @@ from neo.util import dump
from neo.node import MasterNode, StorageNode, ClientNode
from neo.connection import ClientConnection
from neo.protocol import Packet
from neo.exception import PrimaryFailure, OperationFailure
class TransactionInformation(object):
"""This class represents information on a transaction."""
......
import logging
from neo.storage.handler import StorageEventHandler
from neo.protocol import INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE
from neo.protocol import INVALID_OID, INVALID_TID, INVALID_UUID, \
RUNNING_STATE, BROKEN_STATE, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
Packet
from neo.util import dump
from neo.node import MasterNode, StorageNode, ClientNode
from neo.connection import ClientConnection
from neo.protocol import Packet
from neo.exception import PrimaryFailure
class VerificationEventHandler(StorageEventHandler):
......@@ -99,8 +100,9 @@ class VerificationEventHandler(StorageEventHandler):
if isinstance(conn, ClientConnection):
app = self.app
p = Packet()
p.answerLastIDs(packet.getId(), app.dm.getLastOID(),
app.dm.getLastTID(), app.ptid)
oid = app.dm.getLastOID() or INVALID_OID
tid = app.dm.getLastTID() or INVALID_TID
p.answerLastIDs(packet.getId(), oid, tid, app.ptid)
conn.addPacket(p)
else:
self.handleUnexpectedPacket(conn, packet)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment