Commit 568eebdd authored by Aurel's avatar Aurel

load and tpc_finish must be serialized


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@186 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 1fc63047
...@@ -166,13 +166,13 @@ class Application(object): ...@@ -166,13 +166,13 @@ class Application(object):
# Internal attribute distinct between thread # Internal attribute distinct between thread
self.local_var = local() self.local_var = local()
# Lock definition : # Lock definition :
# _return_lock is used to return data from thread to ZODB # _load_lock is used to make loading and storing atmic
# _oid_lock is used in order to not call multiple oid # _oid_lock is used in order to not call multiple oid
# generation at the same time # generation at the same time
# _cache_lock is used for the client cache # _cache_lock is used for the client cache
lock = Lock() lock = Lock()
self._return_lock_acquire = lock.acquire self._load_lock_acquire = lock.acquire
self._return_lock_release = lock.release self._load_lock_release = lock.release
lock = Lock() lock = Lock()
self._oid_lock_acquire = lock.acquire self._oid_lock_acquire = lock.acquire
self._oid_lock_release = lock.release self._oid_lock_release = lock.release
...@@ -351,15 +351,19 @@ class Application(object): ...@@ -351,15 +351,19 @@ class Application(object):
def load(self, oid, version=None): def load(self, oid, version=None):
"""Load an object for a given oid.""" """Load an object for a given oid."""
# First try from cache # First try from cache
self._cache_lock_acquire() self._load_lock_acquire()
try: try:
if oid in self.mq_cache: self._cache_lock_acquire()
logging.debug('oid %s is cached', dump(oid)) try:
return loads(self.mq_cache[oid][1]), self.mq_cache[oid][0] if oid in self.mq_cache:
logging.debug('load oid %s is cached', dump(oid))
return loads(self.mq_cache[oid][1]), self.mq_cache[oid][0]
finally:
self._cache_lock_release()
# Otherwise get it from storage node
return self._load(oid, cache=1)[:2]
finally: finally:
self._cache_lock_release() self._load_lock_release()
# Otherwise get it from storage node
return self._load(oid, cache=1)[:2]
def loadSerial(self, oid, serial): def loadSerial(self, oid, serial):
...@@ -570,41 +574,44 @@ class Application(object): ...@@ -570,41 +574,44 @@ class Application(object):
"""Finish current transaction.""" """Finish current transaction."""
if self.txn is not transaction: if self.txn is not transaction:
return return
self._load_lock_acquire()
# Call function given by ZODB
if f is not None:
f(self.tid)
# Call finish on master
oid_list = self.txn_data_dict.keys()
conn = self.master_conn
conn.lock()
try: try:
msg_id = conn.getNextId() # Call function given by ZODB
p = Packet() if f is not None:
p.finishTransaction(msg_id, oid_list, self.tid) f(self.tid)
conn.addPacket(p)
conn.expectMessage(msg_id, additional_timeout = 300) # Call finish on master
self.dispatcher.register(conn, msg_id, self.getQueue()) oid_list = self.txn_data_dict.keys()
finally: conn = self.master_conn
conn.unlock() conn.lock()
try:
msg_id = conn.getNextId()
p = Packet()
p.finishTransaction(msg_id, oid_list, self.tid)
conn.addPacket(p)
conn.expectMessage(msg_id, additional_timeout = 300)
self.dispatcher.register(conn, msg_id, self.getQueue())
finally:
conn.unlock()
# Wait for answer # Wait for answer
self._waitMessage(conn, msg_id) self._waitMessage(conn, msg_id)
if self.txn_finished != 1: if self.txn_finished != 1:
raise NEOStorageError('tpc_finish failed') raise NEOStorageError('tpc_finish failed')
# Update cache # Update cache
self._cache_lock_acquire() self._cache_lock_acquire()
try: try:
for oid in self.txn_data_dict.iterkeys(): for oid in self.txn_data_dict.iterkeys():
ddata = self.txn_data_dict[oid] ddata = self.txn_data_dict[oid]
# Now serial is same as tid # Now serial is same as tid
self.mq_cache[oid] = self.tid, ddata self.mq_cache[oid] = self.tid, ddata
finally:
self._cache_lock_release()
self._clear_txn()
return self.tid
finally: finally:
self._cache_lock_release() self._load_lock_release()
self._clear_txn()
return self.tid
def undo(self, transaction_id, txn, wrapper): def undo(self, transaction_id, txn, wrapper):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment