Commit ba8f014f authored by Vincent Pelletier's avatar Vincent Pelletier

Pipeline "store" action on client side.

Storage.store calls can be pipelined when implementation can take advantage of
it (as Zeo does). This allows reducing the impact of (network-induced, mainly)
latency by sending all objects to storages without waiting for storage answer.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@1788 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent ce162b67
......@@ -18,8 +18,7 @@
from ZODB import BaseStorage, ConflictResolution, POSException
from neo.client.app import Application
from neo.client.exception import NEOStorageConflictError, \
NEOStorageNotFoundError
from neo.client.exception import NEOStorageNotFoundError
class Storage(BaseStorage.BaseStorage,
ConflictResolution.ConflictResolvingStorage):
......@@ -61,7 +60,8 @@ class Storage(BaseStorage.BaseStorage,
def tpc_vote(self, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
return self.app.tpc_vote(transaction=transaction)
return self.app.tpc_vote(transaction=transaction,
tryToResolveConflict=self.tryToResolveConflict)
def tpc_abort(self, transaction):
if self._is_read_only:
......@@ -72,30 +72,11 @@ class Storage(BaseStorage.BaseStorage,
return self.app.tpc_finish(transaction=transaction, f=f)
def store(self, oid, serial, data, version, transaction):
app = self.app
if self._is_read_only:
raise POSException.ReadOnlyError()
try:
return app.store(oid = oid, serial = serial,
data = data, version = version,
transaction = transaction)
except NEOStorageConflictError:
conflict_serial = app.getConflictSerial()
tid = app.getTID()
if conflict_serial <= tid:
# Try to resolve conflict only if conflicting serial is older
# than the current transaction ID
new_data = self.tryToResolveConflict(oid,
conflict_serial,
serial, data)
if new_data is not None:
# Try again after conflict resolution
self.store(oid, conflict_serial,
new_data, version, transaction)
return ConflictResolution.ResolvedSerial
raise POSException.ConflictError(oid=oid,
serials=(tid,
serial),data=data)
return self.app.store(oid=oid, serial=serial,
data=data, version=version, transaction=transaction,
tryToResolveConflict=self.tryToResolveConflict)
def getSerial(self, oid):
try:
......@@ -123,11 +104,8 @@ class Storage(BaseStorage.BaseStorage,
def undo(self, transaction_id, txn):
if self._is_read_only:
raise POSException.ReadOnlyError()
try:
return self.app.undo(transaction_id = transaction_id,
txn = txn, wrapper = self)
except NEOStorageConflictError:
raise POSException.ConflictError
return self.app.undo(transaction_id=transaction_id, txn=txn,
tryToResolveConflict=self.tryToResolveConflict)
def undoLog(self, first, last, filter):
......
......@@ -23,6 +23,7 @@ from random import shuffle
from time import sleep
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
from ZODB.ConflictResolution import ResolvedSerial
from neo import setupLog
setupLog('CLIENT', verbose=True)
......@@ -36,7 +37,7 @@ from neo.locking import Lock
from neo.connection import MTClientConnection
from neo.node import NodeManager
from neo.connector import getConnectorHandler
from neo.client.exception import NEOStorageError, NEOStorageConflictError
from neo.client.exception import NEOStorageError
from neo.client.exception import NEOStorageNotFoundError, ConnectionClosed
from neo.exception import NeoException
from neo.client.handlers import storage, master
......@@ -80,6 +81,9 @@ class ThreadContext(object):
'tid': None,
'txn': None,
'data_dict': {},
'object_serial_dict': {},
'object_stored_counter_dict': {},
'conflict_serial_dict': {},
'object_stored': 0,
'txn_voted': False,
'txn_finished': False,
......@@ -88,9 +92,7 @@ class ThreadContext(object):
'history': None,
'node_tids': {},
'node_ready': False,
'conflict_serial': 0,
'asked_object': 0,
'object_stored_counter': 0,
}
......@@ -534,7 +536,8 @@ class Application(object):
self.local_var.txn = transaction
def store(self, oid, serial, data, version, transaction):
def store(self, oid, serial, data, version, transaction,
tryToResolveConflict):
"""Store object."""
if transaction is not self.local_var.txn:
raise StorageTransactionError(self, transaction)
......@@ -551,49 +554,100 @@ class Application(object):
checksum = makeChecksum(compressed_data)
p = Packets.AskStoreObject(oid, serial, 1,
checksum, compressed_data, self.local_var.tid)
# Store object in tmp cache
self.local_var.data_dict[oid] = data
# Store data on each node
self.local_var.object_stored_counter = 0
self.local_var.object_stored_counter_dict[oid] = 0
self.local_var.object_serial_dict[oid] = (serial, version)
local_queue = self.local_var.queue
for cell in cell_list:
conn = self.cp.getConnForCell(cell)
if conn is None:
continue
self.local_var.object_stored = 0
try:
self._askStorage(conn, p)
try:
conn.ask(local_queue, p)
finally:
conn.unlock()
except ConnectionClosed:
continue
# Check we don't get any conflict
if self.local_var.object_stored[0] == -1:
if self.local_var.data_dict.has_key(oid):
# One storage already accept the object, is it normal ??
# remove from dict and raise ConflictError, don't care of
# previous node which already store data as it would be
# resent again if conflict is resolved or txn will be
# aborted
del self.local_var.data_dict[oid]
self.local_var.conflict_serial = self.local_var.object_stored[1]
raise NEOStorageConflictError
# increase counter so that we know if a node has stored the object
# or not
self.local_var.object_stored_counter += 1
if self.local_var.object_stored_counter == 0:
# no storage nodes were available
raise NEOStorageError('tpc_store failed')
self._waitAnyMessage(False)
return None
# Store object in tmp cache
self.local_var.data_dict[oid] = data
def _handleConflicts(self, tryToResolveConflict):
result = []
append = result.append
local_var = self.local_var
# Check for conflicts
data_dict = local_var.data_dict
object_serial_dict = local_var.object_serial_dict
for oid, conflict_serial in local_var.conflict_serial_dict.items():
serial, version = object_serial_dict[oid]
data = data_dict[oid]
tid = local_var.tid
resolved = False
if conflict_serial <= tid:
new_data = tryToResolveConflict(oid, conflict_serial, serial,
data)
if new_data is not None:
# Forget this conflict
del local_var.conflict_serial_dict[oid]
# Try to store again
self.store(oid, conflict_serial, new_data, version,
local_var.txn, tryToResolveConflict)
append(oid)
resolved = True
if not resolved:
# XXX: Is it really required to remove from data_dict ?
del data_dict[oid]
raise ConflictError(oid=oid,
serials=(tid, serial), data=data)
return result
return self.local_var.tid
def waitStoreResponses(self, tryToResolveConflict):
result = []
append = result.append
resolved_oid_set = set()
update = resolved_oid_set.update
local_var = self.local_var
queue = self.local_var.queue
tid = local_var.tid
_waitAnyMessage = self._waitAnyMessage
_handleConflicts = self._handleConflicts
pending = self.dispatcher.pending
while True:
# Wait for all requests to be answered (or their connection to be
# dected as closed)
while pending(queue):
_waitAnyMessage()
conflicts = _handleConflicts(tryToResolveConflict)
if conflicts:
update(conflicts)
else:
# No more conflict resolutions to do, no more pending store
# requests
break
# Check for never-stored objects, and update result for all others
for oid, store_count in \
local_var.object_stored_counter_dict.iteritems():
if store_count == 0:
raise NEOStorageError('tpc_store failed')
elif oid in resolved_oid_set:
append((oid, ResolvedSerial))
else:
append((oid, tid))
return result
def tpc_vote(self, transaction):
def tpc_vote(self, transaction, tryToResolveConflict):
"""Store current transaction."""
local_var = self.local_var
if transaction is not local_var.txn:
raise StorageTransactionError(self, transaction)
result = self.waitStoreResponses(tryToResolveConflict)
tid = local_var.tid
# Store data on each node
voted_counter = 0
......@@ -626,6 +680,8 @@ class Application(object):
# tpc_finish.
self._getMasterConnection()
return result
def tpc_abort(self, transaction):
"""Abort current transaction."""
if transaction is not self.local_var.txn:
......@@ -690,7 +746,7 @@ class Application(object):
finally:
self._load_lock_release()
def undo(self, transaction_id, txn, wrapper):
def undo(self, transaction_id, txn, tryToResolveConflict):
if txn is not self.local_var.txn:
raise StorageTransactionError(self, transaction_id)
......@@ -739,19 +795,9 @@ class Application(object):
# Third do transaction with old data
oid_list = data_dict.keys()
for oid in oid_list:
data = data_dict[oid]
try:
self.store(oid, transaction_id, data, None, txn)
except NEOStorageConflictError, serial:
if serial <= self.local_var.tid:
new_data = wrapper.tryToResolveConflict(oid,
self.local_var.tid, serial, data)
if new_data is not None:
self.store(oid, self.local_var.tid, new_data, None, txn)
continue
raise ConflictError(oid = oid, serials = (self.local_var.tid,
serial),
data = data)
self.store(oid, transaction_id, data_dict[oid], None, txn,
tryToResolveConflict)
self.waitStoreResponses(tryToResolveConflict)
return self.local_var.tid, oid_list
def __undoLog(self, first, last, filter=None, block=0, with_oids=False):
......@@ -930,9 +976,6 @@ class Application(object):
def getTID(self):
return self.local_var.tid
def getConflictSerial(self):
return self.local_var.conflict_serial
def setTransactionFinished(self):
self.local_var.txn_finished = True
......
......@@ -23,8 +23,5 @@ class ConnectionClosed(Exception):
class NEOStorageError(POSException.StorageError):
pass
class NEOStorageConflictError(NEOStorageError):
pass
class NEOStorageNotFoundError(NEOStorageError):
pass
......@@ -63,10 +63,18 @@ class StorageAnswersHandler(AnswerBaseHandler):
compression, checksum, data)
def answerStoreObject(self, conn, conflicting, oid, serial):
local_var = self.app.local_var
object_stored_counter_dict = local_var.object_stored_counter_dict
if conflicting:
self.app.local_var.object_stored = -1, serial
assert object_stored_counter_dict[oid] == 0, \
object_stored_counter_dict[oid]
previous_conflict_serial = local_var.conflict_serial_dict.get(oid,
None)
assert previous_conflict_serial in (None, serial), \
(previous_conflict_serial, serial)
local_var.conflict_serial_dict[oid] = serial
else:
self.app.local_var.object_stored = oid, serial
object_stored_counter_dict[oid] += 1
def answerStoreTransaction(self, conn, tid):
if tid != self.app.getTID():
......
This diff is collapsed.
......@@ -86,13 +86,18 @@ class StorageAnswerHandlerTests(NeoTestBase):
oid = self.getOID(0)
tid = self.getNextTID()
# conflict
self.app.local_var.object_stored = None
local_var = self.app.local_var
local_var.object_stored_counter_dict = {oid: 0}
local_var.conflict_serial_dict = {}
self.handler.answerStoreObject(conn, 1, oid, tid)
self.assertEqual(self.app.local_var.object_stored, (-1, tid))
self.assertEqual(local_var.conflict_serial_dict[oid], tid)
self.assertFalse(local_var.object_stored_counter_dict[oid], 0)
# no conflict
self.app.local_var.object_stored = None
local_var.object_stored_counter_dict = {oid: 0}
local_var.conflict_serial_dict = {}
self.handler.answerStoreObject(conn, 0, oid, tid)
self.assertEqual(self.app.local_var.object_stored, (oid, tid))
self.assertFalse(oid in local_var.conflict_serial_dict)
self.assertEqual(local_var.object_stored_counter_dict[oid], 1)
def test_answerStoreTransaction(self):
conn = self.getConnection()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment