Commit d90c5b83 authored by Julien Muchembled's avatar Julien Muchembled

Allow NEO to store empty values

This changes how NEO stores undo information
and how it is transmitted on the network.
parent 03bf302a
Change History Change History
============== ==============
0.10 (unreleased)
-----------------
- NEO learned to store empty values (although it's useless when managed by
a ZODB Connection).
0.9.2 (unreleased) 0.9.2 (unreleased)
------------------ ------------------
......
...@@ -64,6 +64,9 @@ else: ...@@ -64,6 +64,9 @@ else:
compress = real_compress compress = real_compress
makeChecksum = real_makeChecksum makeChecksum = real_makeChecksum
CHECKED_SERIAL = object()
class Application(object): class Application(object):
"""The client node application.""" """The client node application."""
...@@ -427,15 +430,12 @@ class Application(object): ...@@ -427,15 +430,12 @@ class Application(object):
self._cache.store(oid, *result) self._cache.store(oid, *result)
finally: finally:
self._cache_lock_release() self._cache_lock_release()
if result[0] == '':
raise NEOStorageCreationUndoneError(dump(oid))
return result return result
finally: finally:
self._load_lock_release() self._load_lock_release()
@profiler_decorator @profiler_decorator
def _loadFromStorage(self, oid, at_tid, before_tid): def _loadFromStorage(self, oid, at_tid, before_tid):
data = None
packet = Packets.AskObject(oid, at_tid, before_tid) packet = Packets.AskObject(oid, at_tid, before_tid)
for node, conn in self.cp.iterateForObject(oid, readable=True): for node, conn in self.cp.iterateForObject(oid, readable=True):
try: try:
...@@ -444,23 +444,18 @@ class Application(object): ...@@ -444,23 +444,18 @@ class Application(object):
except ConnectionClosed: except ConnectionClosed:
continue continue
if checksum != makeChecksum(data): if data or checksum:
# Warning: see TODO file. if checksum != makeChecksum(data):
# Check checksum. neo.lib.logging.error('wrong checksum from %s for oid %s',
neo.lib.logging.error('wrong checksum from %s for oid %s',
conn, dump(oid)) conn, dump(oid))
data = None continue
continue if compression:
break data = decompress(data)
if data is None: return data, tid, next_tid
# We didn't got any object from all storage node because of raise NEOStorageCreationUndoneError(dump(oid))
# connection error # We didn't got any object from all storage node because of
raise NEOStorageError('connection failure') # connection error
raise NEOStorageError('connection failure')
# Uncompress data
if compression:
data = decompress(data)
return data, tid, next_tid
@profiler_decorator @profiler_decorator
def _loadFromCache(self, oid, at_tid=None, before_tid=None): def _loadFromCache(self, oid, at_tid=None, before_tid=None):
...@@ -512,8 +507,9 @@ class Application(object): ...@@ -512,8 +507,9 @@ class Application(object):
# This is some undo: either a no-data object (undoing object # This is some undo: either a no-data object (undoing object
# creation) or a back-pointer to an earlier revision (going back to # creation) or a back-pointer to an earlier revision (going back to
# an older object revision). # an older object revision).
data = compressed_data = '' compressed_data = ''
compression = 0 compression = 0
checksum = 0
else: else:
assert data_serial is None assert data_serial is None
compression = self.compress compression = self.compress
...@@ -525,7 +521,7 @@ class Application(object): ...@@ -525,7 +521,7 @@ class Application(object):
compression = 0 compression = 0
else: else:
compression = 1 compression = 1
checksum = makeChecksum(compressed_data) checksum = makeChecksum(compressed_data)
on_timeout = OnTimeout(self.onStoreTimeout, txn_context, oid) on_timeout = OnTimeout(self.onStoreTimeout, txn_context, oid)
# Store object in tmp cache # Store object in tmp cache
data_dict = txn_context['data_dict'] data_dict = txn_context['data_dict']
...@@ -600,11 +596,11 @@ class Application(object): ...@@ -600,11 +596,11 @@ class Application(object):
dump(oid), dump(serial)) dump(oid), dump(serial))
for store_oid, store_data in data_dict.iteritems(): for store_oid, store_data in data_dict.iteritems():
store_serial = object_serial_dict[store_oid] store_serial = object_serial_dict[store_oid]
if store_data is None: if store_data is CHECKED_SERIAL:
self._checkCurrentSerialInTransaction(txn_context, self._checkCurrentSerialInTransaction(txn_context,
store_oid, store_serial) store_oid, store_serial)
else: else:
if store_data is '': if store_data is None:
# Some undo # Some undo
neo.lib.logging.warning('Deadlock avoidance cannot' neo.lib.logging.warning('Deadlock avoidance cannot'
' reliably work with undo, this must be ' ' reliably work with undo, this must be '
...@@ -615,7 +611,7 @@ class Application(object): ...@@ -615,7 +611,7 @@ class Application(object):
store_data, unlock=True) store_data, unlock=True)
else: else:
continue continue
elif data is not None: elif data is not CHECKED_SERIAL:
resolved_serial_set = resolved_conflict_serial_dict.setdefault( resolved_serial_set = resolved_conflict_serial_dict.setdefault(
oid, set()) oid, set())
if resolved_serial_set and conflict_serial <= max( if resolved_serial_set and conflict_serial <= max(
...@@ -644,7 +640,7 @@ class Application(object): ...@@ -644,7 +640,7 @@ class Application(object):
# XXX: Is it really required to remove from data_dict ? # XXX: Is it really required to remove from data_dict ?
del data_dict[oid] del data_dict[oid]
txn_context['data_list'].remove(oid) txn_context['data_list'].remove(oid)
if data is None: if data is CHECKED_SERIAL:
raise ReadConflictError(oid=oid, serials=(conflict_serial, raise ReadConflictError(oid=oid, serials=(conflict_serial,
serial)) serial))
raise ConflictError(oid=oid, serials=(txn_context['ttid'], raise ConflictError(oid=oid, serials=(txn_context['ttid'],
...@@ -789,14 +785,14 @@ class Application(object): ...@@ -789,14 +785,14 @@ class Application(object):
try: try:
cache = self._cache cache = self._cache
for oid, data in txn_context['data_dict'].iteritems(): for oid, data in txn_context['data_dict'].iteritems():
if data is None: if data is CHECKED_SERIAL:
# this is just a remain of # this is just a remain of
# checkCurrentSerialInTransaction call, ignore (no data # checkCurrentSerialInTransaction call, ignore (no data
# was modified). # was modified).
continue continue
# Update ex-latest value in cache # Update ex-latest value in cache
cache.invalidate(oid, tid) cache.invalidate(oid, tid)
if data: if data is not None:
# Store in cache with no next_tid # Store in cache with no next_tid
cache.store(oid, data, tid, None) cache.store(oid, data, tid, None)
finally: finally:
...@@ -1097,7 +1093,7 @@ class Application(object): ...@@ -1097,7 +1093,7 @@ class Application(object):
data_dict = txn_context['data_dict'] data_dict = txn_context['data_dict']
if oid not in data_dict: if oid not in data_dict:
# Marker value so we don't try to resolve conflicts. # Marker value so we don't try to resolve conflicts.
data_dict[oid] = None data_dict[oid] = CHECKED_SERIAL
txn_context['data_list'].append(oid) txn_context['data_list'].append(oid)
packet = Packets.AskCheckCurrentSerial(ttid, serial, oid) packet = Packets.AskCheckCurrentSerial(ttid, serial, oid)
for node, conn in self.cp.iterateForObject(oid, writable=True): for node, conn in self.cp.iterateForObject(oid, writable=True):
......
...@@ -25,6 +25,7 @@ import neo.lib ...@@ -25,6 +25,7 @@ import neo.lib
from hashlib import md5 from hashlib import md5
from neo.storage.database import DatabaseManager from neo.storage.database import DatabaseManager
from neo.storage.database.manager import CreationUndone
from neo.lib.protocol import CellStates, ZERO_OID, ZERO_TID from neo.lib.protocol import CellStates, ZERO_OID, ZERO_TID
from neo.lib import util from neo.lib import util
...@@ -112,9 +113,6 @@ def _noPrune(_): ...@@ -112,9 +113,6 @@ def _noPrune(_):
prune = _prune prune = _prune
class CreationUndone(Exception):
pass
def iterObjSerials(obj): def iterObjSerials(obj):
for tserial in obj.values(): for tserial in obj.values():
for serial in tserial.keys(): for serial in tserial.keys():
...@@ -658,7 +656,7 @@ class BTreeDatabaseManager(DatabaseManager): ...@@ -658,7 +656,7 @@ class BTreeDatabaseManager(DatabaseManager):
# No entry before pack TID, nothing to pack on this object. # No entry before pack TID, nothing to pack on this object.
pass pass
else: else:
if tserial[max_serial][2] == '': if tserial[max_serial][1] is None:
# Last version before/at pack TID is a creation undo, drop # Last version before/at pack TID is a creation undo, drop
# it too. # it too.
max_serial += 1 max_serial += 1
......
...@@ -262,10 +262,7 @@ class DatabaseManager(object): ...@@ -262,10 +262,7 @@ class DatabaseManager(object):
_, compression, checksum, data = self._getObjectData(oid, _, compression, checksum, data = self._getObjectData(oid,
data_serial, serial) data_serial, serial)
except CreationUndone: except CreationUndone:
compression = 0 pass
# XXX: this is the valid checksum for empty string
checksum = 1
data = ''
data_serial = None data_serial = None
if serial is not None: if serial is not None:
serial = p64(serial) serial = p64(serial)
......
...@@ -809,13 +809,13 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -809,13 +809,13 @@ class MySQLDatabaseManager(DatabaseManager):
for count, oid, max_serial in q('SELECT COUNT(*) - 1, oid, ' for count, oid, max_serial in q('SELECT COUNT(*) - 1, oid, '
'MAX(serial) FROM obj_short WHERE serial <= %(tid)d ' 'MAX(serial) FROM obj_short WHERE serial <= %(tid)d '
'GROUP BY oid' % {'tid': tid}): 'GROUP BY oid' % {'tid': tid}):
if q('SELECT LENGTH(value) FROM obj WHERE partition =' if q('SELECT 1 FROM obj WHERE partition ='
'%(partition)s AND oid = %(oid)d AND ' '%(partition)s AND oid = %(oid)d AND '
'serial = %(max_serial)d' % { 'serial = %(max_serial)d AND checksum IS NULL' % {
'oid': oid, 'oid': oid,
'partition': getPartition(oid), 'partition': getPartition(oid),
'max_serial': max_serial, 'max_serial': max_serial,
})[0][0] == 0: }):
count += 1 count += 1
max_serial += 1 max_serial += 1
if count: if count:
......
...@@ -96,6 +96,9 @@ class BaseClientAndStorageOperationHandler(EventHandler): ...@@ -96,6 +96,9 @@ class BaseClientAndStorageOperationHandler(EventHandler):
serial, next_serial, compression, checksum, data, data_serial = o serial, next_serial, compression, checksum, data, data_serial = o
neo.lib.logging.debug('oid = %s, serial = %s, next_serial = %s', neo.lib.logging.debug('oid = %s, serial = %s, next_serial = %s',
dump(oid), dump(serial), dump(next_serial)) dump(oid), dump(serial), dump(next_serial))
if checksum is None:
checksum = 0
data = ''
p = Packets.AnswerObject(oid, serial, next_serial, p = Packets.AnswerObject(oid, serial, next_serial,
compression, checksum, data, data_serial) compression, checksum, data, data_serial)
conn.answer(p) conn.answer(p)
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
import neo.lib import neo.lib
from neo.lib import protocol from neo.lib import protocol
from neo.lib.util import dump from neo.lib.util import dump, makeChecksum
from neo.lib.protocol import Packets, LockState, Errors from neo.lib.protocol import Packets, LockState, Errors
from neo.storage.handlers import BaseClientAndStorageOperationHandler from neo.storage.handlers import BaseClientAndStorageOperationHandler
from neo.storage.transactions import ConflictError, DelayedError from neo.storage.transactions import ConflictError, DelayedError
...@@ -88,11 +88,12 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler): ...@@ -88,11 +88,12 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
compression, checksum, data, data_serial, ttid, unlock): compression, checksum, data, data_serial, ttid, unlock):
# register the transaction # register the transaction
self.app.tm.register(conn.getUUID(), ttid) self.app.tm.register(conn.getUUID(), ttid)
if data_serial is not None: if data or checksum:
assert data == '', repr(data) # TODO: return an appropriate error packet
# Change data to None here, to do it only once, even if store gets assert makeChecksum(data) == checksum
# delayed. assert data_serial is None
data = None else:
checksum = data = None
self._askStoreObject(conn, oid, serial, compression, checksum, data, self._askStoreObject(conn, oid, serial, compression, checksum, data,
data_serial, ttid, unlock, time.time()) data_serial, ttid, unlock, time.time())
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
import unittest import unittest
from mock import Mock, ReturnValues from mock import Mock, ReturnValues
from collections import deque from collections import deque
from neo.lib.util import makeChecksum
from neo.tests import NeoUnitTestBase from neo.tests import NeoUnitTestBase
from neo.storage.app import Application from neo.storage.app import Application
from neo.storage.transactions import ConflictError, DelayedError from neo.storage.transactions import ConflictError, DelayedError
...@@ -207,7 +208,8 @@ class StorageClientHandlerTests(NeoUnitTestBase): ...@@ -207,7 +208,8 @@ class StorageClientHandlerTests(NeoUnitTestBase):
def _getObject(self): def _getObject(self):
oid = self.getOID(0) oid = self.getOID(0)
serial = self.getNextTID() serial = self.getNextTID()
return (oid, serial, 1, '1', 'DATA') data = 'DATA'
return (oid, serial, 1, makeChecksum(data), data)
def _checkStoreObjectCalled(self, *args): def _checkStoreObjectCalled(self, *args):
calls = self.app.tm.mockGetNamedCalls('storeObject') calls = self.app.tm.mockGetNamedCalls('storeObject')
...@@ -237,10 +239,10 @@ class StorageClientHandlerTests(NeoUnitTestBase): ...@@ -237,10 +239,10 @@ class StorageClientHandlerTests(NeoUnitTestBase):
tid = self.getNextTID() tid = self.getNextTID()
oid, serial, comp, checksum, data = self._getObject() oid, serial, comp, checksum, data = self._getObject()
data_tid = self.getNextTID() data_tid = self.getNextTID()
self.operation.askStoreObject(conn, oid, serial, comp, checksum, self.operation.askStoreObject(conn, oid, serial, comp, 0,
'', data_tid, tid, False) '', data_tid, tid, False)
self._checkStoreObjectCalled(tid, serial, oid, comp, self._checkStoreObjectCalled(tid, serial, oid, comp,
checksum, None, data_tid, False) None, None, data_tid, False)
pconflicting, poid, pserial = self.checkAnswerStoreObject(conn, pconflicting, poid, pserial = self.checkAnswerStoreObject(conn,
decode=True) decode=True)
self.assertEqual(pconflicting, 0) self.assertEqual(pconflicting, 0)
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import threading import threading
import transaction
from thread import get_ident from thread import get_ident
from persistent import Persistent from persistent import Persistent
from neo.storage.transactions import TransactionManager, \ from neo.storage.transactions import TransactionManager, \
...@@ -37,6 +38,25 @@ class PCounterWithResolution(PCounter): ...@@ -37,6 +38,25 @@ class PCounterWithResolution(PCounter):
class Test(NEOThreadedTest): class Test(NEOThreadedTest):
def testBasicStore(self):
cluster = NEOCluster()
try:
cluster.start()
storage = cluster.getZODBStorage()
for data in 'foo', '':
oid = storage.new_oid()
txn = transaction.Transaction()
storage.tpc_begin(txn)
r1 = storage.store(oid, None, data, '', txn)
r2 = storage.tpc_vote(txn)
serial = storage.tpc_finish(txn)
self.assertEqual((data, serial), storage.load(oid, ''))
storage._cache.clear()
self.assertEqual((data, serial), storage.load(oid, ''))
self.assertEqual((data, serial), storage.load(oid, ''))
finally:
cluster.stop()
def testDelayedUnlockInformation(self): def testDelayedUnlockInformation(self):
except_list = [] except_list = []
def delayUnlockInformation(conn, packet): def delayUnlockInformation(conn, packet):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment