Commit b2f78b00 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Fix several problems in client node.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@138 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 506f613d
...@@ -2,6 +2,7 @@ from Queue import Queue ...@@ -2,6 +2,7 @@ from Queue import Queue
from threading import Lock from threading import Lock
from ZODB import BaseStorage, ConflictResolution, POSException from ZODB import BaseStorage, ConflictResolution, POSException
from ZODB.utils import oid_repr, p64, u64 from ZODB.utils import oid_repr, p64, u64
import logging
from neo.client.dispatcher import Dispatcher from neo.client.dispatcher import Dispatcher
from neo.event import EventManager from neo.event import EventManager
...@@ -15,13 +16,6 @@ class NEOStorageConflictError(NEOStorageError): ...@@ -15,13 +16,6 @@ class NEOStorageConflictError(NEOStorageError):
class NEOStorageNotFoundError(NEOStorageError): class NEOStorageNotFoundError(NEOStorageError):
pass pass
# defined variable used to notify thread of exception
NEO_ERROR = 'neo_error'
NEO_CONFLICT_ERROR = 'neo_conflict_error'
NEO_NOT_FOUND_ERROR = 'neo_not_found_error'
import logging
class NEOStorage(BaseStorage.BaseStorage, class NEOStorage(BaseStorage.BaseStorage,
ConflictResolution.ConflictResolvingStorage): ConflictResolution.ConflictResolvingStorage):
"""Wrapper class for neoclient.""" """Wrapper class for neoclient."""
...@@ -53,11 +47,11 @@ class NEOStorage(BaseStorage.BaseStorage, ...@@ -53,11 +47,11 @@ class NEOStorage(BaseStorage.BaseStorage,
message_queue, request_queue) message_queue, request_queue)
def load(self, oid, version=None): def load(self, oid, version=None):
r = self.app.process_method('load', oid=oid) try:
if r == NEO_NOT_FOUND_ERROR: r = self.app.process_method('load', oid=oid)
raise POSException.POSKeyError (oid)
else:
return r[0], r[1] return r[0], r[1]
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
def close(self): def close(self):
return self.app.process_method('close') return self.app.process_method('close')
...@@ -77,58 +71,46 @@ class NEOStorage(BaseStorage.BaseStorage, ...@@ -77,58 +71,46 @@ class NEOStorage(BaseStorage.BaseStorage,
if self._is_read_only: if self._is_read_only:
raise POSException.ReadOnlyError() raise POSException.ReadOnlyError()
r = self.app.process_method('new_oid') r = self.app.process_method('new_oid')
if r in (NEO_ERROR, NEO_NOT_FOUND_ERROR, NEO_CONFLICT_ERROR): return r
raise NEOStorageError
else:
return r
def tpc_begin(self, transaction, tid=None, status=' '): def tpc_begin(self, transaction, tid=None, status=' '):
if self._is_read_only: if self._is_read_only:
raise POSException.ReadOnlyError() raise POSException.ReadOnlyError()
self._txn_lock_acquire() self._txn_lock_acquire()
r = self.app.process_method('tpc_begin', transaction=transaction, tid=tid, status=status) r = self.app.process_method('tpc_begin', transaction=transaction, tid=tid, status=status)
if r in (NEO_ERROR, NEO_NOT_FOUND_ERROR, NEO_CONFLICT_ERROR): return r
raise NEOStorageError
else:
return r
def tpc_vote(self, transaction): def tpc_vote(self, transaction):
if self._is_read_only: if self._is_read_only:
raise POSException.ReadOnlyError() raise POSException.ReadOnlyError()
r = self.app.process_method('tpc_vote', transaction=transaction) r = self.app.process_method('tpc_vote', transaction=transaction)
if r in (NEO_ERROR, NEO_NOT_FOUND_ERROR, NEO_CONFLICT_ERROR): return r
raise NEOStorageError
else:
return r
def tpc_abort(self, transaction): def tpc_abort(self, transaction):
if self._is_read_only: if self._is_read_only:
raise POSException.ReadOnlyError() raise POSException.ReadOnlyError()
try: try:
r = self.app.process_method('tpc_abort', transaction=transaction) r = self.app.process_method('tpc_abort', transaction=transaction)
if r in (NEO_ERROR, NEO_NOT_FOUND_ERROR, NEO_CONFLICT_ERROR): return r
raise NEOStorageError
else:
return r
finally: finally:
self._txn_lock_release() self._txn_lock_release()
def tpc_finish(self, transaction, f=None): def tpc_finish(self, transaction, f=None):
try: try:
r = self.app.process_method('tpc_finish', transaction=transaction, f=f) r = self.app.process_method('tpc_finish', transaction=transaction, f=f)
if r in (NEO_ERROR, NEO_NOT_FOUND_ERROR, NEO_CONFLICT_ERROR): return r
raise NEOStorageError
else:
return r
finally: finally:
self._txn_lock_release() self._txn_lock_release()
def store(self, oid, serial, data, version, transaction): def store(self, oid, serial, data, version, transaction):
if self._is_read_only: if self._is_read_only:
raise POSException.ReadOnlyError() raise POSException.ReadOnlyError()
r = self.app.process_method('store', oid=oid, serial=serial, data=data, try:
version=version, transaction=transaction) r = self.app.process_method('store', oid = oid, serial = serial,
if r == NEO_CONFLICT_ERROR: data = data, version = version,
transaction = transaction)
return r
except NEOStorageConflictError:
if self.app.conflict_serial <= self.app.tid: if self.app.conflict_serial <= self.app.tid:
# Try to resolve conflict only if conflicting serial is older # Try to resolve conflict only if conflicting serial is older
# than the current transaction ID # than the current transaction ID
...@@ -140,41 +122,31 @@ class NEOStorage(BaseStorage.BaseStorage, ...@@ -140,41 +122,31 @@ class NEOStorage(BaseStorage.BaseStorage,
raise POSException.ConflictError(oid=oid, raise POSException.ConflictError(oid=oid,
serials=(self.app.tid, serials=(self.app.tid,
serial),data=data) serial),data=data)
elif r in (NEO_ERROR, NEO_NOT_FOUND_ERROR):
raise NEOStorageError
else:
return r
def _clear_temp(self): def _clear_temp(self):
raise NotImplementedError raise NotImplementedError
def getSerial(self, oid): def getSerial(self, oid):
r = self.app.process_method('getSerial', oid=oid) try:
if r == NEO_NOT_FOUND_ERROR: r = self.app.process_method('getSerial', oid = oid)
raise POSException.POSKeyError (oid)
elif r in (NEO_ERROR, NEO_CONFLICT_ERROR):
raise NEOStorageError
else:
return r return r
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
# mutliple revisions # mutliple revisions
def loadSerial(self, oid, serial): def loadSerial(self, oid, serial):
r = self.app.process_method('loadSerial', oid=oid, serial=serial) try:
if r == NEO_NOT_FOUND_ERROR: r = self.app.process_method('loadSerial', oid=oid, serial=serial)
raise POSException.POSKeyError (oid, serial)
elif r in (NEO_ERROR, NEO_CONFLICT_ERROR):
raise NEOStorageError
else:
return r return r
except NEOStorageNotFoundError:
raise POSException.POSKeyError (oid, serial)
def loadBefore(self, oid, tid): def loadBefore(self, oid, tid):
r = self.app.process_method('loadBefore', oid=oid, tid=tid) try:
if r == NEO_NOT_FOUND_ERROR: r = self.app.process_method('loadBefore', oid=oid, tid=tid)
raise POSException.POSKeyError (oid, tid)
elif r in (NEO_ERROR, NEO_CONFLICT_ERROR):
raise NEOStorageError
else:
return r return r
except NEOStorageNotFoundError:
raise POSException.POSKeyError (oid, tid)
def iterator(self, start=None, stop=None): def iterator(self, start=None, stop=None):
raise NotImplementedError raise NotImplementedError
...@@ -185,13 +157,13 @@ class NEOStorage(BaseStorage.BaseStorage, ...@@ -185,13 +157,13 @@ class NEOStorage(BaseStorage.BaseStorage,
raise POSException.ReadOnlyError() raise POSException.ReadOnlyError()
self._txn_lock_acquire() self._txn_lock_acquire()
try: try:
r = self.app.process_method('undo', transaction_id=transaction_id, txn=txn, wrapper=self) try:
if r == NEO_CONFLICT_ERROR: r = self.app.process_method('undo',
raise POSException.ConflictError transaction_id = transaction_id,
elif r in (NEO_ERROR, NOT_FOUND_ERROR): txn = txn, wrapper = self)
raise NEOStorageError
else:
return r return r
except NEOStorageConflictError:
raise POSException.ConflictError
finally: finally:
self._txn_lock_release() self._txn_lock_release()
...@@ -199,10 +171,7 @@ class NEOStorage(BaseStorage.BaseStorage, ...@@ -199,10 +171,7 @@ class NEOStorage(BaseStorage.BaseStorage,
if self._is_read_only: if self._is_read_only:
raise POSException.ReadOnlyError() raise POSException.ReadOnlyError()
r = self.undoLog(first, last, filter) r = self.undoLog(first, last, filter)
if r in (NEO_ERROR, NEO_NOT_FOUND_ERROR, NEO_CONFLICT_ERROR): return r
raise NEOStorageError
else:
return r
def supportsUndo(self): def supportsUndo(self):
return 0 return 0
......
...@@ -3,7 +3,7 @@ import os ...@@ -3,7 +3,7 @@ import os
from time import time from time import time
from threading import Lock, local from threading import Lock, local
from cPickle import dumps, loads from cPickle import dumps, loads
from zlib import compress, adler32, decompress from zlib import compress, decompress
from Queue import Queue, Empty from Queue import Queue, Empty
from neo.client.mq import MQ from neo.client.mq import MQ
...@@ -17,9 +17,10 @@ from neo.client.handler import ClientEventHandler ...@@ -17,9 +17,10 @@ from neo.client.handler import ClientEventHandler
from neo.client.NEOStorage import NEOStorageError, NEOStorageConflictError, \ from neo.client.NEOStorage import NEOStorageError, NEOStorageConflictError, \
NEOStorageNotFoundError NEOStorageNotFoundError
from neo.client.multithreading import ThreadingMixIn from neo.client.multithreading import ThreadingMixIn
from neo.util import makeChecksum
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
from ZODB.utils import p64, u64 from ZODB.utils import p64, u64, oid_repr
class ConnectionManager(object): class ConnectionManager(object):
"""This class manage a pool of connection to storage node.""" """This class manage a pool of connection to storage node."""
...@@ -337,13 +338,13 @@ class Application(ThreadingMixIn, object): ...@@ -337,13 +338,13 @@ class Application(ThreadingMixIn, object):
logging.error('got wrong oid %s instead of %s from node %s' \ logging.error('got wrong oid %s instead of %s from node %s' \
%(noid, oid, storage_node.getServer())) %(noid, oid, storage_node.getServer()))
continue continue
elif compression and checksum != adler32(data): elif checksum != makeChecksum(data):
# Check checksum if we use compression # Check checksum.
logging.error('wrong checksum from node %s for oid %s' \ logging.error('wrong checksum from node %s for oid %s' \
%(storage_node.getServer(), oid)) %(storage_node.getServer(), oid))
continue continue
else: else:
# Everything looks allright # Everything looks alright.
break break
if self.local_var.asked_object == -1: if self.local_var.asked_object == -1:
...@@ -430,7 +431,7 @@ class Application(ThreadingMixIn, object): ...@@ -430,7 +431,7 @@ class Application(ThreadingMixIn, object):
# Store data on each node # Store data on each node
ddata = dumps(data) ddata = dumps(data)
compressed_data = compress(ddata) compressed_data = compress(ddata)
checksum = adler32(compressed_data) checksum = makeChecksum(compressed_data)
for storage_node in storage_node_list: for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node) conn = self.cm.getConnForNode(storage_node)
if conn is None: if conn is None:
......
...@@ -50,13 +50,15 @@ class ClientEventHandler(EventHandler): ...@@ -50,13 +50,15 @@ class ClientEventHandler(EventHandler):
else: else:
app = self.app app = self.app
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
logging.info("connection to storage node %s closed" %(node.getServer(),)) if node is not None:
logging.info("connection to storage node %s closed",
node.getServer())
if isinstance(node, StorageNode): if isinstance(node, StorageNode):
# Notify primary master node that a storage node is temporarily down # Notify primary master node that a storage node is temporarily down
conn = app.master_conn conn = app.master_conn
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
ip_address, port = node.getServer() ip_address, port = node.getServer()
node_list = [(STORAGE_NODE_TYPE, ip_address, port, node.getUUID(), node_list = [(STORAGE_NODE_TYPE, ip_address, port, node.getUUID(),
TEMPORARILY_DOWN_STATE),] TEMPORARILY_DOWN_STATE),]
p.notifyNodeInformation(msg_id, node_list) p.notifyNodeInformation(msg_id, node_list)
...@@ -362,7 +364,7 @@ class ClientEventHandler(EventHandler): ...@@ -362,7 +364,7 @@ class ClientEventHandler(EventHandler):
# Storage node handler # Storage node handler
def handleAnwserObject(self, conn, packet, oid, start_serial, end_serial, compression, def handleAnswerObject(self, conn, packet, oid, start_serial, end_serial, compression,
checksum, data): checksum, data):
if isinstance(conn, ClientConnection): if isinstance(conn, ClientConnection):
app = self.app app = self.app
......
from threading import Thread from threading import Thread
from neo.client.NEOStorage import NEOStorageError, NEOStorageConflictError, \ from neo.client.NEOStorage import NEOStorageError, NEOStorageConflictError, \
NEOStorageNotFoundError, NEO_ERROR, NEO_CONFLICT_ERROR, NEO_NOT_FOUND_ERROR NEOStorageNotFoundError
import logging import logging
class ThreadingMixIn: class ThreadingMixIn:
...@@ -9,24 +9,18 @@ class ThreadingMixIn: ...@@ -9,24 +9,18 @@ class ThreadingMixIn:
def process_method_thread(self, method, kw): def process_method_thread(self, method, kw):
m = getattr(self, method) m = getattr(self, method)
r = None
try: try:
r = m(**kw) r = m(**kw)
self._return_lock_acquire() except Exception, e:
self.returned_data = r r = e.__class__
except NEOStorageConflictError:
self._return_lock_acquire()
self.returned_data = NEO_CONFLICT_ERROR
except NEOStorageNotFoundError:
self._return_lock_acquire()
self.returned_data = NEO_NOT_FOUND_ERROR
except:
self._return_lock_acquire()
self.returned_data = NEO_ERROR
self._return_lock_acquire()
self.returned_data = r
def process_method(self, method, **kw): def process_method(self, method, **kw):
"""Start a new thread to process the method.""" """Start a new thread to process the method."""
# XXX why is it necessary to start a new thread here? -yo
# XXX it is too heavy to create a new thread every time. -yo
t = Thread(target = self.process_method_thread, t = Thread(target = self.process_method_thread,
args = (method, kw)) args = (method, kw))
t.start() t.start()
...@@ -34,6 +28,14 @@ class ThreadingMixIn: ...@@ -34,6 +28,14 @@ class ThreadingMixIn:
# under protection of a lock # under protection of a lock
try: try:
t.join() t.join()
return self.returned_data r = self.returned_data
try:
if issubclass(r, NEOStorageError):
raise r()
elif issubclass(r, Exception):
raise NEOStorageError()
except TypeError:
pass
return r
finally: finally:
self._return_lock_release() self._return_lock_release()
import logging
from neo.protocol import UP_TO_DATE_STATE, OUT_OF_DATE_STATE, FEEDING_STATE, \ from neo.protocol import UP_TO_DATE_STATE, OUT_OF_DATE_STATE, FEEDING_STATE, \
DISCARDED_STATE, RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \ DISCARDED_STATE, RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
BROKEN_STATE BROKEN_STATE
from neo.util import dump
class Cell(object): class Cell(object):
"""This class represents a cell in a partition table.""" """This class represents a cell in a partition table."""
...@@ -131,7 +134,28 @@ class PartitionTable(object): ...@@ -131,7 +134,28 @@ class PartitionTable(object):
def hasOffset(self, offset): def hasOffset(self, offset):
return self.partition_list[offset] is not None return self.partition_list[offset] is not None
def log(self):
"""Help debugging partition table management."""
node_list = self.count_dict.keys()
node_list.sort()
node_dict = {}
for i, node in enumerate(node_list):
node_dict[node] = i
for node, i in node_dict.iteritems():
logging.debug('pt: node %d: %s', i, dump(node.getUUID()))
state_dict = { UP_TO_DATE_STATE: 'U',
OUT_OF_DATE_STATE: 'O',
FEEDING_STATE: 'F' }
for offset, row in enumerate(self.partition_list):
desc_list = []
for cell in row:
i = node_dict[cell.getNode()]
s = state_dict[cell.getState()]
desc_list.append('%d %s' % (i, s))
logging.debug('pt: row %d: %s', offset, ', '.join(desc_list))
def operational(self): def operational(self):
self.log()
if not self.filled(): if not self.filled():
return False return False
......
from zlib import adler32
def dump(s): def dump(s):
"""Dump a binary string in hex."""
ret = [] ret = []
for c in s: for c in s:
ret.append('%02x' % ord(c)) ret.append('%02x' % ord(c))
return ''.join(ret) return ''.join(ret)
def makeChecksum(s):
"""Return a 4-byte integer checksum against a string."""
return adler32(s) & 0xffffffff
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment