Commit 972ef850 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Define our own class to manage thread-safe data and initialize them propely.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@547 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 015b87ee
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
import logging import logging
import os import os
from threading import local from thread import get_ident
from cPickle import dumps, loads from cPickle import dumps, loads
from zlib import compress, decompress from zlib import compress, decompress
from Queue import Queue, Empty from Queue import Queue, Empty
...@@ -174,6 +174,43 @@ class ConnectionPool(object): ...@@ -174,6 +174,43 @@ class ConnectionPool(object):
self.connection_lock_release() self.connection_lock_release()
class ThreadContext(object):
_threads_dict = {}
def __getThreadData(self):
id = get_ident()
try:
result = self._threads_dict[id]
except KeyError:
self.clear(id)
result = self._threads_dict[id]
return result
def __getattr__(self, name):
thread_data = self.__getThreadData()
try:
return thread_data[name]
except KeyError:
raise AttributeError, name
def __setattr__(self, name, value):
thread_data = self.__getThreadData()
thread_data[name] = value
def clear(self, id=None):
if id is None:
id = get_ident()
self._threads_dict[id] = {
'tid': None,
'txn': None,
'data_dict': {},
'object_stored': 0,
'txn_voted': False,
'txn_finished': False,
}
class Application(object): class Application(object):
"""The client node application.""" """The client node application."""
...@@ -204,8 +241,7 @@ class Application(object): ...@@ -204,8 +241,7 @@ class Application(object):
self.primary_handler = PrimaryEventHandler(self, self.dispatcher) self.primary_handler = PrimaryEventHandler(self, self.dispatcher)
self.storage_handler = StorageEventHandler(self, self.dispatcher) self.storage_handler = StorageEventHandler(self, self.dispatcher)
# Internal attribute distinct between thread # Internal attribute distinct between thread
self.local_var = local() self.local_var = ThreadContext()
self._clear_txn()
# Lock definition : # Lock definition :
# _load_lock is used to make loading and storing atomic # _load_lock is used to make loading and storing atomic
lock = Lock() lock = Lock()
...@@ -441,9 +477,6 @@ class Application(object): ...@@ -441,9 +477,6 @@ class Application(object):
def tpc_begin(self, transaction, tid=None, status=' '): def tpc_begin(self, transaction, tid=None, status=' '):
"""Begin a new transaction.""" """Begin a new transaction."""
# dirty, but we need an initialization point for each thread
if getattr(self.local_var, 'txn', None) is None:
self._clear_txn()
# First get a transaction, only one is allowed at a time # First get a transaction, only one is allowed at a time
if self.local_var.txn is transaction: if self.local_var.txn is transaction:
# We already begin the same transaction # We already begin the same transaction
...@@ -538,15 +571,6 @@ class Application(object): ...@@ -538,15 +571,6 @@ class Application(object):
if not self.isTransactionVoted(): if not self.isTransactionVoted():
raise NEOStorageError('tpc_vote failed') raise NEOStorageError('tpc_vote failed')
def _clear_txn(self):
"""Clear some transaction parameters."""
self.local_var.tid = None
self.local_var.txn = None
self.local_var.data_dict = {}
self.local_var.object_stored = 0
self.local_var.txn_voted = False
self.local_var.txn_finished = False
def tpc_abort(self, transaction): def tpc_abort(self, transaction):
"""Abort current transaction.""" """Abort current transaction."""
if transaction is not self.local_var.txn: if transaction is not self.local_var.txn:
...@@ -584,8 +608,7 @@ class Application(object): ...@@ -584,8 +608,7 @@ class Application(object):
conn.notify(protocol.abortTransaction(self.local_var.tid)) conn.notify(protocol.abortTransaction(self.local_var.tid))
finally: finally:
conn.unlock() conn.unlock()
self.local_var.clear()
self._clear_txn()
def tpc_finish(self, transaction, f=None): def tpc_finish(self, transaction, f=None):
"""Finish current transaction.""" """Finish current transaction."""
...@@ -614,7 +637,7 @@ class Application(object): ...@@ -614,7 +637,7 @@ class Application(object):
self.mq_cache[oid] = self.local_var.tid, data self.mq_cache[oid] = self.local_var.tid, data
finally: finally:
self._cache_lock_release() self._cache_lock_release()
self._clear_txn() self.local_var.clear()
return self.local_var.tid return self.local_var.tid
finally: finally:
self._load_lock_release() self._load_lock_release()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment