Commit b8ba22a3 authored by Vincent Pelletier's avatar Vincent Pelletier

Split getCellListForID into 2 methods (to prepare for a move to strongly-typed language).


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@915 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 9ca59fd5
...@@ -375,10 +375,15 @@ class Application(object): ...@@ -375,10 +375,15 @@ class Application(object):
self._getMasterConnection() self._getMasterConnection()
return self.pt return self.pt
def _getCellListForID(self, id, readable=False, writable=False): def _getCellListForOID(self, oid, readable=False, writable=False):
""" Return the cells available for the specified (O|T)ID """ """ Return the cells available for the specified OID """
pt = self._getPartitionTable() pt = self._getPartitionTable()
return pt.getCellListForID(id, readable, writable) return pt.getCellListForOID(oid, readable, writable)
def _getCellListForTID(self, tid, readable=False, writable=False):
""" Return the cells available for the specified TID """
pt = self._getPartitionTable()
return pt.getCellListForTID(tid, readable, writable)
def _connectToPrimaryMasterNode(self): def _connectToPrimaryMasterNode(self):
logging.debug('connecting to primary master...') logging.debug('connecting to primary master...')
...@@ -523,7 +528,7 @@ class Application(object): ...@@ -523,7 +528,7 @@ class Application(object):
def _load(self, oid, serial=None, tid=None, cache=0): def _load(self, oid, serial=None, tid=None, cache=0):
"""Internal method which manage load ,loadSerial and loadBefore.""" """Internal method which manage load ,loadSerial and loadBefore."""
cell_list = self._getCellListForID(oid, readable=True) cell_list = self._getCellListForOID(oid, readable=True)
if len(cell_list) == 0: if len(cell_list) == 0:
# No cells available, so why are we running ? # No cells available, so why are we running ?
logging.error('oid %s not found because no storage is available for it', dump(oid)) logging.error('oid %s not found because no storage is available for it', dump(oid))
...@@ -651,7 +656,7 @@ class Application(object): ...@@ -651,7 +656,7 @@ class Application(object):
logging.debug('storing oid %s serial %s', logging.debug('storing oid %s serial %s',
dump(oid), dump(serial)) dump(oid), dump(serial))
# Find which storage node to use # Find which storage node to use
cell_list = self._getCellListForID(oid, writable=True) cell_list = self._getCellListForOID(oid, writable=True)
if len(cell_list) == 0: if len(cell_list) == 0:
# FIXME must wait for cluster to be ready # FIXME must wait for cluster to be ready
raise NEOStorageError raise NEOStorageError
...@@ -705,7 +710,7 @@ class Application(object): ...@@ -705,7 +710,7 @@ class Application(object):
oid_list = self.local_var.data_dict.keys() oid_list = self.local_var.data_dict.keys()
# Store data on each node # Store data on each node
pt = self._getPartitionTable() pt = self._getPartitionTable()
cell_list = self._getCellListForID(self.local_var.tid, writable=True) cell_list = self._getCellListForTID(self.local_var.tid, writable=True)
self.local_var.voted_counter = 0 self.local_var.voted_counter = 0
for cell in cell_list: for cell in cell_list:
logging.info("voting object %s %s" %(cell.getServer(), cell.getState())) logging.info("voting object %s %s" %(cell.getServer(), cell.getState()))
...@@ -737,9 +742,9 @@ class Application(object): ...@@ -737,9 +742,9 @@ class Application(object):
cell_set = set() cell_set = set()
# select nodes where objects were stored # select nodes where objects were stored
for oid in self.local_var.data_dict.iterkeys(): for oid in self.local_var.data_dict.iterkeys():
cell_set |= set(self._getCellListForID(oid, writable=True)) cell_set |= set(self._getCellListForOID(oid, writable=True))
# select nodes where transaction was stored # select nodes where transaction was stored
cell_set |= set(self._getCellListForID(self.local_var.tid, writable=True)) cell_set |= set(self._getCellListForTID(self.local_var.tid, writable=True))
# cancel transaction one all those nodes # cancel transaction one all those nodes
for cell in cell_set: for cell in cell_set:
...@@ -797,7 +802,7 @@ class Application(object): ...@@ -797,7 +802,7 @@ class Application(object):
raise StorageTransactionError(self, transaction_id) raise StorageTransactionError(self, transaction_id)
# First get transaction information from a storage node. # First get transaction information from a storage node.
cell_list = self._getCellListForID(transaction_id, writable=True) cell_list = self._getCellListForTID(transaction_id, writable=True)
shuffle(cell_list) shuffle(cell_list)
for cell in cell_list: for cell in cell_list:
conn = self.cp.getConnForCell(cell) conn = self.cp.getConnForCell(cell)
...@@ -898,7 +903,7 @@ class Application(object): ...@@ -898,7 +903,7 @@ class Application(object):
undo_info = [] undo_info = []
append = undo_info.append append = undo_info.append
for tid in ordered_tids: for tid in ordered_tids:
cell_list = self._getCellListForID(tid, readable=True) cell_list = self._getCellListForTID(tid, readable=True)
shuffle(cell_list) shuffle(cell_list)
for cell in cell_list: for cell in cell_list:
conn = self.cp.getConnForCell(cell) conn = self.cp.getConnForCell(cell)
...@@ -931,7 +936,7 @@ class Application(object): ...@@ -931,7 +936,7 @@ class Application(object):
# FIXME: filter function isn't used # FIXME: filter function isn't used
def history(self, oid, version=None, length=1, filter=None, object_only=0): def history(self, oid, version=None, length=1, filter=None, object_only=0):
# Get history informations for object first # Get history informations for object first
cell_list = self._getCellListForID(oid, readable=True) cell_list = self._getCellListForOID(oid, readable=True)
shuffle(cell_list) shuffle(cell_list)
for cell in cell_list: for cell in cell_list:
...@@ -963,7 +968,7 @@ class Application(object): ...@@ -963,7 +968,7 @@ class Application(object):
# Now that we have object informations, get txn informations # Now that we have object informations, get txn informations
history_list = [] history_list = []
for serial, size in self.local_var.history[1]: for serial, size in self.local_var.history[1]:
self._getCellListForID(serial, readable=True) self._getCellListForTID(serial, readable=True)
shuffle(cell_list) shuffle(cell_list)
for cell in cell_list: for cell in cell_list:
......
...@@ -114,8 +114,16 @@ class PartitionTable(object): ...@@ -114,8 +114,16 @@ class PartitionTable(object):
except (TypeError, KeyError): except (TypeError, KeyError):
return [] return []
def getCellListForID(self, id, readable=False, writable=False): def getCellListForTID(self, tid, readable=False, writable=False):
return self.getCellList(u64(id) % self.np, readable, writable) return self.getCellList(self._getPartitionFromIndex(u64(tid)),
readable, writable)
def getCellListForOID(self, oid, readable=False, writable=False):
return self.getCellList(self._getPartitionFromIndex(u64(oid)),
readable, writable)
def _getPartitionFromIndex(self, index):
return u64(any_id) % self.np
def setCell(self, offset, node, state): def setCell(self, offset, node, state):
assert state in VALID_CELL_STATE_LIST assert state in VALID_CELL_STATE_LIST
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment