Commit ea6b1aaa authored by Yoshinori Okuji's avatar Yoshinori Okuji

Fix the store method. Reduce the timeout of poll more. Add more debug...

Fix the store method. Reduce the timeout of poll more. Add more debug messages. Fix some bugs in TID generation. Increase timeout for Lock Information. Choose a storage node randomly in _load.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@147 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 2730315a
......@@ -114,11 +114,14 @@ class NEOStorage(BaseStorage.BaseStorage,
if self.app.conflict_serial <= self.app.tid:
# Try to resolve conflict only if conflicting serial is older
# than the current transaction ID
new_data = self.tryToResolveConflict(oid, self.app.tid,
serial, data)
new_data = self.tryToResolveConflict(oid,
self.app.conflict_serial,
serial, data)
if new_data is not None:
# Try again after conflict resolution
return self.store(oid, serial, new_data, version, transaction)
self.store(oid, self.app.conflict_serial,
new_data, version, transaction)
return ConflictResolution.ResolvedSerial
raise POSException.ConflictError(oid=oid,
serials=(self.app.tid,
serial),data=data)
......
......@@ -5,6 +5,7 @@ from threading import Lock, local
from cPickle import dumps, loads
from zlib import compress, decompress
from Queue import Queue, Empty
from random import shuffle
from neo.client.mq import MQ
from neo.node import NodeManager, MasterNode, StorageNode
......@@ -17,7 +18,7 @@ from neo.client.handler import ClientEventHandler
from neo.client.NEOStorage import NEOStorageError, NEOStorageConflictError, \
NEOStorageNotFoundError
from neo.client.multithreading import ThreadingMixIn
from neo.util import makeChecksum
from neo.util import makeChecksum, dump
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
from ZODB.utils import p64, u64, oid_repr
......@@ -302,16 +303,19 @@ class Application(ThreadingMixIn, object):
"""Internal method which manage load ,loadSerial and loadBefore."""
partition_id = u64(oid) % self.num_partitions
# Only used up to date node for retrieving object
storage_node_list = [x for x in self.pt.getCellList(partition_id, True) \
if x.getState() == UP_TO_DATE_STATE]
cell_list = self.pt.getCellList(partition_id, True)
data = None
# Store data on each node
if len(storage_node_list) == 0:
if len(cell_list) == 0:
# FIXME must wait for cluster to be ready
raise NEOStorageNotFoundError()
for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node)
shuffle(cell_list)
self.local_var.asked_object = -1
for cell in cell_list:
logging.debug('trying to load %s from %s',
dump(oid), dump(cell.getUUID()))
conn = self.cm.getConnForNode(cell)
if conn is None:
continue
msg_id = conn.getNextId()
......@@ -350,6 +354,7 @@ class Application(ThreadingMixIn, object):
if self.local_var.asked_object == -1:
# We didn't got any object from all storage node
logging.debug('oid %s not found', dump(oid))
raise NEOStorageNotFoundError()
# Uncompress data
......@@ -374,6 +379,7 @@ class Application(ThreadingMixIn, object):
self._cache_lock_acquire()
try:
if oid in self.mq_cache:
logging.debug('oid %s is cached', dump(oid))
return loads(self.mq_cache[oid][1]), self.mq_cache[oid][0]
finally:
self._cache_lock_release()
......@@ -383,13 +389,15 @@ class Application(ThreadingMixIn, object):
def loadSerial(self, oid, serial):
"""Load an object for a given oid and serial."""
# Do not try in cache as it managed only up-to-date object
# Do not try in cache as it manages only up-to-date object
logging.debug('loading %s at %s', dump(oid), dump(serial))
return self._load(oid, serial)[:2], None
def loadBefore(self, oid, tid):
"""Load an object for a given oid before tid committed."""
# Do not try in cache as it managed only up-to-date object
# Do not try in cache as it manages only up-to-date object
logging.debug('loading %s before %s', dump(oid), dump(tid))
return self._load(oid, tid)
......@@ -423,9 +431,11 @@ class Application(ThreadingMixIn, object):
raise StorageTransactionError(self, transaction)
if serial is None:
serial = INVALID_SERIAL
logging.info('storing oid %s serial %s',
dump(oid), dump(serial))
# Find which storage node to use
partition_id = u64(oid) % self.num_partitions
storage_node_list = self.pt.getCellList(partition_id, True)
storage_node_list = self.pt.getCellList(partition_id, False)
if len(storage_node_list) == 0:
# FIXME must wait for cluster to be ready
raise NEOStorageError
......@@ -460,6 +470,8 @@ class Application(ThreadingMixIn, object):
noid, nserial = self.txn_object_stored
self.txn_data_dict[oid] = ddata
return self.tid
def tpc_vote(self, transaction):
"""Store current transaction."""
......
......@@ -24,7 +24,7 @@ class Dispatcher(Thread):
# First check if we receive any new message from other node
m = None
try:
self.em.poll(0.2)
self.em.poll(0.02)
except KeyError:
# This happen when there is no connection
logging.error('Dispatcher, run, poll returned a KeyError')
......
......@@ -356,7 +356,7 @@ class Application(object):
# Now I have at least one to ask.
prev_lptid = self.lptid
node = nm.getNodeByUUID(self.target_uuid)
if node.getState() != RUNNING_STATE:
if node is None or node.getState() != RUNNING_STATE:
# Weird. It's dead.
logging.info('the target storage node is dead')
continue
......@@ -712,8 +712,8 @@ class Application(object):
def getNextTID(self):
tm = time()
gmt = gmtime(tm)
upper = ((((gmt.tm_year - 1900) * 12 + gmt.tm_mon) * 31 + gmt.tm_mday - 1) \
* 24 + gmt.tm_hour) * 60 + gmt.tm_min
upper = ((((gmt.tm_year - 1900) * 12 + gmt.tm_mon - 1) * 31 \
+ gmt.tm_mday - 1) * 24 + gmt.tm_hour) * 60 + gmt.tm_min
lower = int((gmt.tm_sec % 60 + (tm - int(tm))) / (60.0 / 65536.0 / 65536.0))
tid = pack('!LL', upper, lower)
if tid <= self.ltid:
......@@ -721,13 +721,11 @@ class Application(object):
if lower == 0xffffffff:
# This should not happen usually.
from datetime import timedelta, datetime
hour, min = divmod(upper, 60)
day, hour = divmod(hour, 24)
month, day = divmod(day, 31)
year, month = divmod(month, 12)
d = datetime(year, month, day + 1, hour, min) + timedelta(0, 60)
upper = (((d.year * 12 + d.month) * 31 + d.day - 1) \
* 24 + d.hour) * 60 + d.minute
d = datetime(gmt.tm_year, gmt.tm_mon, gmt.tm_mday,
gmt.tm_hour, gmt.tm_min) \
+ timedelta(0, 60)
upper = ((((d.year - 1900) * 12 + d.month - 1) * 31 \
+ d.day - 1) * 24 + d.hour) * 60 + d.minute
lower = 0
else:
lower += 1
......
......@@ -438,7 +438,7 @@ class ServiceEventHandler(MasterEventHandler):
if c.getUUID() in uuid_set:
msg_id = c.getNextId()
c.addPacket(Packet().lockInformation(msg_id, tid))
c.expectMessage(msg_id)
c.expectMessage(msg_id, timeout = 60)
t = FinishingTransaction(conn, packet, oid_list, uuid_set)
app.finishing_transaction_dict[tid] = t
......
......@@ -3,6 +3,7 @@ from MySQLdb import OperationalError
from MySQLdb.constants.CR import SERVER_GONE_ERROR, SERVER_LOST
import logging
from array import array
import string
from neo.storage.database import DatabaseManager
from neo.exception import DatabaseFailure
......@@ -52,7 +53,14 @@ class MySQLDatabaseManager(DatabaseManager):
"""Query data from a database."""
conn = self.conn
try:
logging.debug('querying %s...', query.split('\n', 1)[0])
printable_char_list = []
for c in query.split('\n', 1)[0]:
if c not in string.printable or c in '\t\x0b\x0c\r':
c = '\\x%02x' % ord(c)
printable_char_list.append(c)
query_part = ''.join(printable_char_list)
logging.debug('querying %s...', query_part)
conn.query(query)
r = conn.store_result()
if r is not None:
......
......@@ -304,9 +304,12 @@ class OperationEventHandler(StorageEventHandler):
serial, next_serial, compression, checksum, data = o
if next_serial is None:
next_serial = INVALID_SERIAL
logging.debug('oid = %s, serial = %s, next_serial = %s',
dump(oid), dump(serial), dump(next_serial))
p.answerObject(packet.getId(), oid, serial, next_serial,
compression, checksum, data)
else:
logging.debug('oid = %s not found', dump(oid))
p.oidNotFound(packet.getId(), '%s does not exist' % dump(oid))
conn.addPacket(p)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment