Commit ea1f2b70 authored by Aurel's avatar Aurel

remove ThreadinMixin class


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@157 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 8c4c5df6
......@@ -48,13 +48,12 @@ class NEOStorage(BaseStorage.BaseStorage,
def load(self, oid, version=None):
try:
r = self.app.process_method('load', oid=oid)
return r[0], r[1]
return self.app.load(oid=oid)
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
def close(self):
return self.app.process_method('close')
return self.appclose()
def cleanup(self):
raise NotImplementedError
......@@ -70,35 +69,30 @@ class NEOStorage(BaseStorage.BaseStorage,
def new_oid(self):
if self._is_read_only:
raise POSException.ReadOnlyError()
r = self.app.process_method('new_oid')
return r
return self.app.new_oid()
def tpc_begin(self, transaction, tid=None, status=' '):
if self._is_read_only:
raise POSException.ReadOnlyError()
self._txn_lock_acquire()
r = self.app.process_method('tpc_begin', transaction=transaction, tid=tid, status=status)
return r
return self.app.tpc_begin(transaction=transaction, tid=tid, status=status)
def tpc_vote(self, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
r = self.app.process_method('tpc_vote', transaction=transaction)
return r
return self.app.tpc_vote(transaction=transaction)
def tpc_abort(self, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
try:
r = self.app.process_method('tpc_abort', transaction=transaction)
return r
return self.app.tpc_abort(transaction=transaction)
finally:
self._txn_lock_release()
def tpc_finish(self, transaction, f=None):
try:
r = self.app.process_method('tpc_finish', transaction=transaction, f=f)
return r
return self.app.tpc_finish(transaction=transaction, f=f)
finally:
self._txn_lock_release()
......@@ -106,10 +100,9 @@ class NEOStorage(BaseStorage.BaseStorage,
if self._is_read_only:
raise POSException.ReadOnlyError()
try:
r = self.app.process_method('store', oid = oid, serial = serial,
data = data, version = version,
transaction = transaction)
return r
return self.app.store(oid = oid, serial = serial,
data = data, version = version,
transaction = transaction)
except NEOStorageConflictError:
if self.app.conflict_serial <= self.app.tid:
# Try to resolve conflict only if conflicting serial is older
......@@ -131,23 +124,20 @@ class NEOStorage(BaseStorage.BaseStorage,
def getSerial(self, oid):
try:
r = self.app.process_method('getSerial', oid = oid)
return r
return self.app.getSerial(oid = oid)
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
# mutliple revisions
def loadSerial(self, oid, serial):
try:
r = self.app.process_method('loadSerial', oid=oid, serial=serial)
return r
return self.app.loadSerial(oid=oid, serial=serial)
except NEOStorageNotFoundError:
raise POSException.POSKeyError (oid, serial)
def loadBefore(self, oid, tid):
try:
r = self.app.process_method('loadBefore', oid=oid, tid=tid)
return r
return self.app.loadBefore(oid=oid, tid=tid)
except NEOStorageNotFoundError:
raise POSException.POSKeyError (oid, tid)
......@@ -161,10 +151,8 @@ class NEOStorage(BaseStorage.BaseStorage,
self._txn_lock_acquire()
try:
try:
r = self.app.process_method('undo',
transaction_id = transaction_id,
txn = txn, wrapper = self)
return r
return self.app.undo(transaction_id = transaction_id,
txn = txn, wrapper = self)
except NEOStorageConflictError:
raise POSException.ConflictError
finally:
......@@ -173,8 +161,7 @@ class NEOStorage(BaseStorage.BaseStorage,
def undoLog(self, first, last, filter):
if self._is_read_only:
raise POSException.ReadOnlyError()
r = self.app.undoLog(first, last, filter)
return r
return self.app.undoLog(first, last, filter)
def supportsUndo(self):
return 0
......
......@@ -16,7 +16,6 @@ from neo.protocol import Packet, INVALID_UUID, INVALID_TID, \
from neo.client.handler import ClientEventHandler
from neo.client.NEOStorage import NEOStorageError, NEOStorageConflictError, \
NEOStorageNotFoundError
from neo.client.multithreading import ThreadingMixIn
from neo.util import makeChecksum, dump
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
......@@ -48,7 +47,7 @@ class ConnectionPool(object):
try:
msg_id = conn.getNextId()
p = Packet()
p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE,
p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE,
self.app.uuid, addr[0],
addr[1], self.app.name)
conn.addPacket(p)
......@@ -67,7 +66,7 @@ class ConnectionPool(object):
try:
msg_id = conn.getNextId()
p = Packet()
node_list = [(STORAGE_NODE_TYPE, addr[0], addr[1],
node_list = [(STORAGE_NODE_TYPE, addr[0], addr[1],
node.getUUID(), TEMPORARILY_DOWN_STATE),]
p.notifyNodeInformation(msg_id, node_list)
conn.addPacket(p)
......@@ -121,7 +120,7 @@ class ConnectionPool(object):
self.connection_dict.pop(node.getUUID())
class Application(ThreadingMixIn, object):
class Application(object):
"""The client node application."""
def __init__(self, master_nodes, name, em, dispatcher, request_queue, **kw):
......@@ -402,7 +401,7 @@ class Application(ThreadingMixIn, object):
raise StorageTransactionError(self, transaction)
if serial is None:
serial = INVALID_SERIAL
logging.info('storing oid %s serial %s',
logging.debug('storing oid %s serial %s',
dump(oid), dump(serial))
# Find which storage node to use
partition_id = u64(oid) % self.num_partitions
......@@ -423,7 +422,7 @@ class Application(ThreadingMixIn, object):
try:
msg_id = conn.getNextId()
p = Packet()
p.askStoreObject(msg_id, oid, serial, 1,
p.askStoreObject(msg_id, oid, serial, 1,
checksum, compressed_data, self.tid)
conn.addPacket(p)
conn.expectMessage(msg_id)
......@@ -471,7 +470,7 @@ class Application(ThreadingMixIn, object):
try:
msg_id = conn.getNextId()
p = Packet()
p.askStoreTransaction(msg_id, self.tid, user, desc, ext,
p.askStoreTransaction(msg_id, self.tid, user, desc, ext,
oid_list)
conn.addPacket(p)
conn.expectMessage(msg_id)
......
from threading import Thread
from neo.client.NEOStorage import NEOStorageError, NEOStorageConflictError, \
NEOStorageNotFoundError
import logging
class ThreadingMixIn:
"""Mix-in class to handle each method in a new thread."""
def process_method_thread(self, method, kw):
m = getattr(self, method)
try:
r = m(**kw)
except Exception, e:
r = e.__class__
self._return_lock_acquire()
self.returned_data = r
def process_method(self, method, **kw):
"""Start a new thread to process the method."""
# XXX why is it necessary to start a new thread here? -yo
# XXX it is too heavy to create a new thread every time. -yo
return getattr(self, method)(**kw)
t = Thread(target = self.process_method_thread,
args = (method, kw))
t.start()
# wait for thread to be completed, returned value must be
# under protection of a lock
try:
t.join()
r = self.returned_data
try:
if issubclass(r, NEOStorageError):
raise r()
elif issubclass(r, Exception):
raise NEOStorageError()
except TypeError:
pass
return r
finally:
self._return_lock_release()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment