Commit 89c90374 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Remove client transaction lock. Set all transaction related variables as

thread-safe. Add two lock, one for the node manager, another for the partition
table to ensure exclusive access.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@478 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 5f4ba33b
......@@ -37,8 +37,6 @@ class Storage(BaseStorage.BaseStorage,
logging.basicConfig(level=logging.DEBUG, format=format)
# Transaction must be under protection of lock
l = Lock()
self._txn_lock_acquire = l.acquire
self._txn_lock_release = l.release
self.app = Application(master_nodes, name, connector)
def load(self, oid, version=None):
......@@ -71,7 +69,6 @@ class Storage(BaseStorage.BaseStorage,
def tpc_begin(self, transaction, tid=None, status=' '):
if self._is_read_only:
raise POSException.ReadOnlyError()
self._txn_lock_acquire()
return self.app.tpc_begin(transaction=transaction, tid=tid, status=status)
def tpc_vote(self, transaction):
......@@ -82,16 +79,10 @@ class Storage(BaseStorage.BaseStorage,
def tpc_abort(self, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
try:
return self.app.tpc_abort(transaction=transaction)
finally:
self._txn_lock_release()
def tpc_finish(self, transaction, f=None):
try:
return self.app.tpc_finish(transaction=transaction, f=f)
finally:
self._txn_lock_release()
def store(self, oid, serial, data, version, transaction):
app = self.app
......
......@@ -208,38 +208,44 @@ class Application(object):
self.ptid = INVALID_PTID
self.num_replicas = 0
self.num_partitions = 0
#self.handler = ClientEventHandler(self, self.dispatcher)
self.primary_handler = PrimaryEventHandler(self, self.dispatcher)
self.storage_handler = StorageEventHandler(self, self.dispatcher)
#self.answer_handler = ClientAnswerEventHandler(self, self.dispatcher)
# Transaction specific variable
self.tid = None
self.txn = None
self.txn_data_dict = {}
self.txn_object_stored = 0
self.txn_voted = False
self.txn_finished = False
# Internal attribute distinct between thread
self.local_var = local()
self.local_var.txn = None
# Transaction specific variable
self.local_var.data_dict = {}
self.local_var.object_stored = 0
self.local_var.txn_voted = False
self.local_var.txn_finished = False
self.local_var.tid = None
# Lock definition :
# _load_lock is used to make loading and storing atmic
# _oid_lock is used in order to not call multiple oid
# generation at the same time
# _cache_lock is used for the client cache
# _connecting_to_master_node is used to prevent simultaneous master
# node connection attemps
# _load_lock is used to make loading and storing atomic
lock = Lock()
self._load_lock_acquire = lock.acquire
self._load_lock_release = lock.release
# _oid_lock is used in order to not call multiple oid
# generation at the same time
lock = Lock()
self._oid_lock_acquire = lock.acquire
self._oid_lock_release = lock.release
lock = Lock()
# _cache_lock is used for the client cache
self._cache_lock_acquire = lock.acquire
self._cache_lock_release = lock.release
lock = Lock()
# _connecting_to_master_node is used to prevent simultaneous master
# node connection attemps
self._connecting_to_master_node_acquire = lock.acquire
self._connecting_to_master_node_release = lock.release
# _nm ensure exclusive access to the node manager
lock = Lock()
self._nm_acquire = lock.acquire
self._nm_release = lock.release
# __pt ensure exclusive access to the partition table
lock = Lock()
self._pt_acquire = lock.acquire
self._pt_release = lock.release
# Connect to master node
self.connectToPrimaryMasterNode()
if self.uuid == INVALID_UUID:
......@@ -338,7 +344,12 @@ class Application(object):
self.local_var.asked_object = None
while self.local_var.asked_object is None:
self._pt_acquire()
try:
cell_list = self.pt.getCellList(partition_id, readable=True)
finally:
self._pt_release()
if len(cell_list) == 0:
sleep(1)
continue
......@@ -448,12 +459,12 @@ class Application(object):
def tpc_begin(self, transaction, tid=None, status=' '):
"""Begin a new transaction."""
# First get a transaction, only one is allowed at a time
if self.txn is transaction:
if self.local_var.txn is transaction:
# We already begin the same transaction
return
# Get a new transaction id if necessary
if tid is None:
self.tid = None
self.local_var.tid = None
conn = self.master_conn
if conn is None:
raise NEOStorageError("Connection to master node failed")
......@@ -469,16 +480,16 @@ class Application(object):
conn.unlock()
# Wait for answer
self._waitPrimaryMessage(conn, msg_id)
if self.tid is None:
if self.local_var.tid is None:
raise NEOStorageError('tpc_begin failed')
else:
self.tid = tid
self.txn = transaction
self.local_var.tid = tid
self.local_var.txn = transaction
def store(self, oid, serial, data, version, transaction):
"""Store object."""
if transaction is not self.txn:
if transaction is not self.local_var.txn:
raise StorageTransactionError(self, transaction)
if serial is None:
serial = INVALID_SERIAL
......@@ -486,7 +497,11 @@ class Application(object):
dump(oid), dump(serial))
# Find which storage node to use
partition_id = u64(oid) % self.num_partitions
self._pt_acquire()
try:
cell_list = self.pt.getCellList(partition_id, writable=True)
finally:
self._pt_release()
if len(cell_list) == 0:
# FIXME must wait for cluster to be ready
raise NEOStorageError
......@@ -503,44 +518,48 @@ class Application(object):
msg_id = conn.getNextId()
p = Packet()
p.askStoreObject(msg_id, oid, serial, 1,
checksum, compressed_data, self.tid)
checksum, compressed_data, self.local_var.tid)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue())
self.txn_object_stored = 0
self.local_var.object_stored = 0
finally:
conn.unlock()
# Check we don't get any conflict
self._waitStorageMessage(conn, msg_id)
if self.txn_object_stored[0] == -1:
if self.txn_data_dict.has_key(oid):
if self.local_var.object_stored[0] == -1:
if self.local_var.data_dict.has_key(oid):
# One storage already accept the object, is it normal ??
# remove from dict and raise ConflictError, don't care of
# previous node which already store data as it would be resent
# again if conflict is resolved or txn will be aborted
del self.txn_data_dict[oid]
self.conflict_serial = self.txn_object_stored[1]
del self.local_var.data_dict[oid]
self.conflict_serial = self.local_var.object_stored[1]
raise NEOStorageConflictError
# Store object in tmp cache
noid, nserial = self.txn_object_stored
self.txn_data_dict[oid] = data
noid, nserial = self.local_var.object_stored
self.local_var.data_dict[oid] = data
return self.tid
return self.local_var.tid
def tpc_vote(self, transaction):
"""Store current transaction."""
if transaction is not self.txn:
if transaction is not self.local_var.txn:
raise StorageTransactionError(self, transaction)
user = transaction.user
desc = transaction.description
ext = dumps(transaction._extension)
oid_list = self.txn_data_dict.keys()
oid_list = self.local_var.data_dict.keys()
# Store data on each node
partition_id = u64(self.tid) % self.num_partitions
partition_id = u64(self.local_var.tid) % self.num_partitions
self._pt_acquire()
try:
cell_list = self.pt.getCellList(partition_id, writable=True)
finally:
self._pt_release()
for cell in cell_list:
logging.info("voting object %s %s" %(cell.getServer(), cell.getState()))
conn = self.cp.getConnForNode(cell)
......@@ -550,12 +569,12 @@ class Application(object):
try:
msg_id = conn.getNextId()
p = Packet()
p.askStoreTransaction(msg_id, self.tid, user, desc, ext,
p.askStoreTransaction(msg_id, self.local_var.tid, user, desc, ext,
oid_list)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue())
self.txn_voted = False
self.local_var.txn_voted = False
finally:
conn.unlock()
......@@ -565,27 +584,31 @@ class Application(object):
def _clear_txn(self):
"""Clear some transaction parameters."""
self.tid = None
self.txn = None
self.txn_data_dict.clear()
self.txn_voted = False
self.txn_finished = False
self.local_var.tid = None
self.local_var.txn = None
self.local_var.data_dict.clear()
self.local_var.txn_voted = False
self.local_var.txn_finished = False
def tpc_abort(self, transaction):
"""Abort current transaction."""
if transaction is not self.txn:
if transaction is not self.local_var.txn:
return
cell_set = set()
self._pt_acquire()
try:
# select nodes where objects were stored
for oid in self.txn_data_dict.iterkeys():
for oid in self.local_var.data_dict.iterkeys():
partition_id = u64(oid) % self.num_partitions
cell_set |= set(self.pt.getCellList(partition_id, writable=True))
# select nodes where transaction was stored
partition_id = u64(self.tid) % self.num_partitions
partition_id = u64(self.local_var.tid) % self.num_partitions
cell_set |= set(self.pt.getCellList(partition_id, writable=True))
finally:
self._pt_release()
# cancel transaction one all those nodes
for cell in cell_set:
......@@ -593,7 +616,7 @@ class Application(object):
if conn is None:
continue
try:
conn.addPacket(Packet().abortTransaction(conn.getNextId(), self.tid))
conn.addPacket(Packet().abortTransaction(conn.getNextId(), self.local_var.tid))
finally:
conn.unlock()
......@@ -601,7 +624,7 @@ class Application(object):
conn = self.master_conn
conn.lock()
try:
conn.addPacket(Packet().abortTransaction(conn.getNextId(), self.tid))
conn.addPacket(Packet().abortTransaction(conn.getNextId(), self.local_var.tid))
finally:
conn.unlock()
......@@ -609,22 +632,22 @@ class Application(object):
def tpc_finish(self, transaction, f=None):
"""Finish current transaction."""
if self.txn is not transaction:
if self.local_var.txn is not transaction:
return
self._load_lock_acquire()
try:
# Call function given by ZODB
if f is not None:
f(self.tid)
f(self.local_var.tid)
# Call finish on master
oid_list = self.txn_data_dict.keys()
oid_list = self.local_var.data_dict.keys()
conn = self.master_conn
conn.lock()
try:
msg_id = conn.getNextId()
p = Packet()
p.finishTransaction(msg_id, oid_list, self.tid)
p.finishTransaction(msg_id, oid_list, self.local_var.tid)
conn.addPacket(p)
conn.expectMessage(msg_id, additional_timeout = 300)
self.dispatcher.register(conn, msg_id, self.getQueue())
......@@ -640,24 +663,28 @@ class Application(object):
# Update cache
self._cache_lock_acquire()
try:
for oid in self.txn_data_dict.iterkeys():
data = self.txn_data_dict[oid]
for oid in self.local_var.data_dict.iterkeys():
data = self.local_var.data_dict[oid]
# Now serial is same as tid
self.mq_cache[oid] = self.tid, data
self.mq_cache[oid] = self.local_var.tid, data
finally:
self._cache_lock_release()
self._clear_txn()
return self.tid
return self.local_var.tid
finally:
self._load_lock_release()
def undo(self, transaction_id, txn, wrapper):
if txn is not self.txn:
if txn is not self.local_var.txn:
raise StorageTransactionError(self, transaction_id)
# First get transaction information from a storage node.
partition_id = u64(transaction_id) % self.num_partitions
self._pt_acquire()
try:
cell_list = self.pt.getCellList(partition_id, writable=True)
finally:
self._pt_release()
shuffle(cell_list)
for cell in cell_list:
conn = self.cp.getConnForNode(cell)
......@@ -711,15 +738,15 @@ class Application(object):
try:
self.store(oid, transaction_id, data, None, txn)
except NEOStorageConflictError, serial:
if serial <= self.tid:
new_data = wrapper.tryToResolveConflict(oid, self.tid,
if serial <= self.local_var.tid:
new_data = wrapper.tryToResolveConflict(oid, self.local_var.tid,
serial, data)
if new_data is not None:
self.store(oid, self.tid, new_data, None, txn)
self.store(oid, self.local_var.tid, new_data, None, txn)
continue
raise ConflictError(oid = oid, serials = (self.tid, serial),
raise ConflictError(oid = oid, serials = (self.local_var.tid, serial),
data = data)
return self.tid, oid_list
return self.local_var.tid, oid_list
def undoLog(self, first, last, filter=None, block=0):
if last < 0:
......@@ -727,10 +754,13 @@ class Application(object):
last = first - last
# First get a list of transactions from all storage nodes.
#storage_node_list = [x for x in self.pt.getNodeList() if x.getState() \
# in (UP_TO_DATE_STATE, FEEDING_STATE)]
# FIXME: should we filter the node list with usable cells ?
# Each storage node will return TIDs only for UP_TO_DATE_STATE and
# FEEDING_STATE cells
self._pt_acquire()
try:
storage_node_list = self.pt.getNodeList()
finally:
self._pt_release()
self.local_var.node_tids = {}
for storage_node in storage_node_list:
......@@ -766,7 +796,11 @@ class Application(object):
undo_info = []
for tid in ordered_tids:
partition_id = u64(tid) % self.num_partitions
self._pt_acquire()
try:
cell_list = self.pt.getCellList(partition_id, readable=True)
finally:
self._pt_release()
shuffle(cell_list)
for cell in cell_list:
conn = self.cp.getConnForNode(storage_node)
......@@ -817,7 +851,11 @@ class Application(object):
def history(self, oid, version=None, length=1, filter=None, object_only=0):
# Get history informations for object first
partition_id = u64(oid) % self.num_partitions
self._pt_acquire()
try:
cell_list = self.pt.getCellList(partition_id, readable=True)
finally:
self._pt_release()
shuffle(cell_list)
for cell in cell_list:
......@@ -854,7 +892,11 @@ class Application(object):
history_list = []
for serial, size in self.local_var.history[1]:
partition_id = u64(serial) % self.num_partitions
self._pt_acquire()
try:
cell_list = self.pt.getCellList(partition_id, readable=True)
finally:
self._pt_release()
shuffle(cell_list)
for cell in cell_list:
......@@ -914,6 +956,7 @@ class Application(object):
logging.debug('already connected')
return
if self.pt is not None:
# pt is protected with the master lock
self.pt.clear()
master_index = 0
conn = None
......@@ -935,9 +978,13 @@ class Application(object):
handler = PrimaryBoostrapEventHandler(self, self.dispatcher)
conn = MTClientConnection(self.em, handler, (addr, port),
connector_handler=self.connector_handler)
self._nm_acquire()
try:
if self.nm.getNodeByServer((addr, port)) is None:
n = MasterNode(server = (addr, port))
self.nm.add(n)
finally:
self._nm_release()
conn.lock()
try:
......@@ -992,23 +1039,23 @@ class Application(object):
return self.local_var.node_ready
def setTID(self, value):
self.tid = value
self.local_var.tid = value
def getTID(self):
return self.tid
return self.local_var.tid
def getConflictSerial(self):
return self.conflict_serial
def setTransactionFinished(self):
self.txn_finished = True
self.local_var.txn_finished = True
def isTransactionFinished(self):
return self.txn_finished
return self.local_var.txn_finished
def setTransactionVoted(self):
self.txn_voted = True
self.local_var.txn_voted = True
def isTransactionVoted(self):
return self.txn_voted
return self.local_var.txn_voted
......@@ -478,9 +478,9 @@ class StorageEventHandler(BaseClientEventHandler):
def handleAnswerStoreObject(self, conn, packet, conflicting, oid, serial):
app = self.app
if conflicting:
app.txn_object_stored = -1, serial
app.local_var.object_stored = -1, serial
else:
app.txn_object_stored = oid, serial
app.local_var.object_stored = oid, serial
def handleAnswerStoreTransaction(self, conn, packet, tid):
app = self.app
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment