Commit f99797e4 authored by Aurel's avatar Aurel

mainly fix computation of partition id + other fix in transaction method


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@115 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 3da553a7
...@@ -19,7 +19,7 @@ from neo.client.NEOStorage import NEOStorageError, NEOStorageConflictError, \ ...@@ -19,7 +19,7 @@ from neo.client.NEOStorage import NEOStorageError, NEOStorageConflictError, \
from neo.client.multithreading import ThreadingMixIn from neo.client.multithreading import ThreadingMixIn
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
from ZODB.utils import p64, u64
class ConnectionManager(object): class ConnectionManager(object):
"""This class manage a pool of connection to storage node.""" """This class manage a pool of connection to storage node."""
...@@ -298,7 +298,7 @@ class Application(ThreadingMixIn, object): ...@@ -298,7 +298,7 @@ class Application(ThreadingMixIn, object):
def _load(self, oid, serial=INVALID_TID, tid=INVALID_TID, cache=0): def _load(self, oid, serial=INVALID_TID, tid=INVALID_TID, cache=0):
"""Internal method which manage load ,loadSerial and loadBefore.""" """Internal method which manage load ,loadSerial and loadBefore."""
partition_id = oid % self.num_partitions partition_id = u64(oid) % self.num_partitions
# Only used up to date node for retrieving object # Only used up to date node for retrieving object
storage_node_list = [x for x in self.pt.getCellList(partition_id, True) \ storage_node_list = [x for x in self.pt.getCellList(partition_id, True) \
if x.getState() == UP_TO_DATE_STATE] if x.getState() == UP_TO_DATE_STATE]
...@@ -311,7 +311,7 @@ class Application(ThreadingMixIn, object): ...@@ -311,7 +311,7 @@ class Application(ThreadingMixIn, object):
continue continue
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.askObject(msg_id, str(oid), str(serial), str(tid)) p.askObject(msg_id, oid, serial, tid)
self.local_var.tmp_q = Queue(1) self.local_var.tmp_q = Queue(1)
self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True) self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
...@@ -416,8 +416,10 @@ class Application(ThreadingMixIn, object): ...@@ -416,8 +416,10 @@ class Application(ThreadingMixIn, object):
"""Store object.""" """Store object."""
if transaction is not self.txn: if transaction is not self.txn:
raise StorageTransactionError(self, transaction) raise StorageTransactionError(self, transaction)
if serial is None:
serial = INVALID_SERIAL
# Find which storage node to use # Find which storage node to use
partition_id = oid % self.num_partitions partition_id = u64(oid) % self.num_partitions
storage_node_list = self.pt.getCellList(partition_id, True) storage_node_list = self.pt.getCellList(partition_id, True)
# Store data on each node # Store data on each node
...@@ -430,7 +432,7 @@ class Application(ThreadingMixIn, object): ...@@ -430,7 +432,7 @@ class Application(ThreadingMixIn, object):
continue continue
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.askStoreObject(msg_id, oid, serial, 1, checksum, compressed_data) p.askStoreObject(msg_id, oid, serial, 1, checksum, compressed_data, self.tid)
self.local_var.tmp_q = Queue(1) self.local_var.tmp_q = Queue(1)
self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True) self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
...@@ -460,7 +462,7 @@ class Application(ThreadingMixIn, object): ...@@ -460,7 +462,7 @@ class Application(ThreadingMixIn, object):
ext = dumps(transaction._extension) ext = dumps(transaction._extension)
oid_list = self.txn_data_dict.keys() oid_list = self.txn_data_dict.keys()
# Store data on each node # Store data on each node
partition_id = self.tid % self.num_partitions partition_id = u64(self.tid) % self.num_partitions
storage_node_list = self.pt.getCellList(partition_id, True) storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list: for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node) conn = self.cm.getConnForNode(storage_node)
...@@ -494,7 +496,7 @@ class Application(ThreadingMixIn, object): ...@@ -494,7 +496,7 @@ class Application(ThreadingMixIn, object):
# Abort txn in node where objects were stored # Abort txn in node where objects were stored
aborted_node = {} aborted_node = {}
for oid in self.self.txn_data_dict.iterkeys(): for oid in self.self.txn_data_dict.iterkeys():
partition_id = oid % self.num_partitions partition_id = u64(oid) % self.num_partitions
storage_node_list = self.pt.getCellList(partition_id, True) storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list: for storage_node in storage_node_list:
if not aborted_node.has_key(storage_node): if not aborted_node.has_key(storage_node):
...@@ -508,7 +510,7 @@ class Application(ThreadingMixIn, object): ...@@ -508,7 +510,7 @@ class Application(ThreadingMixIn, object):
aborted_node[storage_node] = 1 aborted_node[storage_node] = 1
# Abort in nodes where transaction was stored # Abort in nodes where transaction was stored
partition_id = self.tid % self.num_partitions partition_id = u64(self.tid) % self.num_partitions
storage_node_list = self.pt.getCellList(partition_id, True) storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list: for storage_node in storage_node_list:
if not aborted_node.has_key(storage_node): if not aborted_node.has_key(storage_node):
...@@ -531,10 +533,11 @@ class Application(ThreadingMixIn, object): ...@@ -531,10 +533,11 @@ class Application(ThreadingMixIn, object):
if f is not None: if f is not None:
f() f()
# Call finish on master # Call finish on master
oid_list = self.txn_data_dict.keys()
conn = self.master_conn conn = self.master_conn
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.finishTransaction(msg_id, self.oid_list, self.tid) p.finishTransaction(msg_id, oid_list, self.tid)
self.local_var.tmp_q = Queue(1) self.local_var.tmp_q = Queue(1)
self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True) self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
# Wait for answer # Wait for answer
...@@ -560,7 +563,7 @@ class Application(ThreadingMixIn, object): ...@@ -560,7 +563,7 @@ class Application(ThreadingMixIn, object):
raise StorageTransactionError(self, transaction_id) raise StorageTransactionError(self, transaction_id)
# First get transaction information from master node # First get transaction information from master node
partition_id = transaction_id % self.num_partitions partition_id = u64(transaction_id) % self.num_partitions
storage_node_list = self.pt.getCellList(partition_id, True) storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list: for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node) conn = self.cm.getConnForNode(storage_node)
...@@ -656,7 +659,7 @@ class Application(ThreadingMixIn, object): ...@@ -656,7 +659,7 @@ class Application(ThreadingMixIn, object):
# For each transaction, get info # For each transaction, get info
undo_info = [] undo_info = []
for tid in ordered_tids: for tid in ordered_tids:
partition_id = tid % self.num_partitions partition_id = u64(tid) % self.num_partitions
storage_node_list = self.pt.getCellList(partition_id, True) storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list: for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node) conn = self.cm.getConnForNode(storage_node)
...@@ -693,7 +696,7 @@ class Application(ThreadingMixIn, object): ...@@ -693,7 +696,7 @@ class Application(ThreadingMixIn, object):
def history(self, oid, version, length=1, filter=None, object_only=0): def history(self, oid, version, length=1, filter=None, object_only=0):
# Get history informations for object first # Get history informations for object first
partition_id = oid % self.num_partitions partition_id = u64(oid) % self.num_partitions
storage_node_list = [x for x in self.pt.getCellList(partition_id, True) \ storage_node_list = [x for x in self.pt.getCellList(partition_id, True) \
if x.getState() == UP_TO_DATE_STATE] if x.getState() == UP_TO_DATE_STATE]
for storage_node in storage_node_list: for storage_node in storage_node_list:
...@@ -722,7 +725,7 @@ class Application(ThreadingMixIn, object): ...@@ -722,7 +725,7 @@ class Application(ThreadingMixIn, object):
# Now that we have object informations, get txn informations # Now that we have object informations, get txn informations
history_list = [] history_list = []
for serial, size in self.local_var.hisory[1]: for serial, size in self.local_var.hisory[1]:
partition_id = serial % self.num_partitions partition_id = u64(serial) % self.num_partitions
storage_node_list = self.pt.getCellList(partition_id, True) storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list: for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node) conn = self.cm.getConnForNode(storage_node)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment