Commit a0280bec authored by Julien Muchembled's avatar Julien Muchembled

pack: some cleanup & better error handling

parent 70277a73
...@@ -231,12 +231,13 @@ class Storage(BaseStorage.BaseStorage, ...@@ -231,12 +231,13 @@ class Storage(BaseStorage.BaseStorage,
logging.exception('source=%r', source) logging.exception('source=%r', source)
raise raise
def pack(self, t, referencesf, gc=False): def pack(self, t, referencesf=None):
if gc: if referencesf is not None:
logging.warning('Garbage Collection is not available in NEO,' logging.warning('Garbage Collection is not available in NEO,'
' please use an external tool. Packing without GC.') ' please use an external tool. Packing without GC.')
try: try:
self.app.pack(tidFromTime(t)) app = self.app
app.pack(min(tidFromTime(t), app.last_tid))
except Exception: except Exception:
logging.exception('pack_time=%r', t) logging.exception('pack_time=%r', t)
raise raise
......
...@@ -1039,11 +1039,20 @@ class Application(ThreadedApplication): ...@@ -1039,11 +1039,20 @@ class Application(ThreadedApplication):
def sync(self): def sync(self):
self._askPrimary(Packets.Ping()) self._askPrimary(Packets.Ping())
def pack(self, tid, _oids=None): # TODO: API for partial pack def setPackOrder(self, transaction, tid, oids=None):
self._txn_container.get(transaction).pack = oids and sorted(oids), tid
def pack(self, tid, oids=None):
if tid == ZERO_TID:
return
transaction = TransactionMetaData(description=TXN_PACK_DESC) transaction = TransactionMetaData(description=TXN_PACK_DESC)
self.tpc_begin(None, transaction) try:
self._txn_container.get(transaction).pack = _oids and sorted(_oids), tid self.tpc_begin(None, transaction)
tid = self.tpc_finish(transaction) self.setPackOrder(transaction, tid, oids)
tid = self.tpc_finish(transaction)
except:
self.tpc_abort(transaction)
raise
if not self.wait_for_pack: if not self.wait_for_pack:
return return
# Waiting for pack to be finished is only needed # Waiting for pack to be finished is only needed
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
from neo.lib.handler import DelayEvent from neo.lib.handler import DelayEvent
from neo.lib.exception import ProtocolError from neo.lib.exception import ProtocolError
from neo.lib.protocol import Packets, MAX_TID, Errors from neo.lib.protocol import Packets, MAX_TID, ZERO_TID, Errors
from ..app import monotonic_time from ..app import monotonic_time
from . import MasterHandler from . import MasterHandler
...@@ -66,6 +66,10 @@ class ClientServiceHandler(MasterHandler): ...@@ -66,6 +66,10 @@ class ClientServiceHandler(MasterHandler):
def askFinishTransaction(self, conn, ttid, oid_list, checked_list, pack): def askFinishTransaction(self, conn, ttid, oid_list, checked_list, pack):
app = self.app app = self.app
if pack:
tid = pack[1]
if tid is None or not ZERO_TID < tid <= app.getLastTransaction():
raise ProtocolError("invalid pack time")
tid, node_list = app.tm.prepare( tid, node_list = app.tm.prepare(
app, app,
ttid, ttid,
......
...@@ -22,7 +22,7 @@ from time import time ...@@ -22,7 +22,7 @@ from time import time
import transaction import transaction
from persistent import Persistent from persistent import Persistent
from ZODB.POSException import UndoError from ZODB.POSException import UndoError
from neo.client.exception import NEOUndoPackError from neo.client.exception import NEOStorageError, NEOUndoPackError
from neo.lib import logging from neo.lib import logging
from neo.lib.protocol import ClusterStates, Packets from neo.lib.protocol import ClusterStates, Packets
from neo.lib.util import add64, p64 from neo.lib.util import add64, p64
...@@ -80,6 +80,10 @@ class PackTests(NEOThreadedTest): ...@@ -80,6 +80,10 @@ class PackTests(NEOThreadedTest):
yield cluster.client.last_tid yield cluster.client.last_tid
c.close() c.close()
def assertPopulated(self, c):
r = c.root()
self.assertEqual([3, 3, 2, 1], [r[x].value for x in 'abcd'])
@with_cluster(partitions=3, replicas=1, storage_count=3) @with_cluster(partitions=3, replicas=1, storage_count=3)
def testOutdatedNodeIsBack(self, cluster): def testOutdatedNodeIsBack(self, cluster):
client = cluster.client client = cluster.client
...@@ -264,17 +268,28 @@ class PackTests(NEOThreadedTest): ...@@ -264,17 +268,28 @@ class PackTests(NEOThreadedTest):
client.pack(tid) client.pack(tid)
deque(populate, 0) deque(populate, 0)
t, c = cluster.getTransaction() t, c = cluster.getTransaction()
r = c.root()
history = c.db().history history = c.db().history
def check(*counts): def check(*counts):
c.cacheMinimize() c.cacheMinimize()
client._cache.clear() client._cache.clear()
self.assertEqual([3, 3, 2, 1], [r[x].value for x in 'abcd']) self.assertPopulated(c)
self.assertSequenceEqual(counts, self.assertSequenceEqual(counts,
[len(history(p64(i), 10)) for i in xrange(5)]) [len(history(p64(i), 10)) for i in xrange(5)])
check(4, 2, 4, 2, 2) check(4, 2, 4, 2, 2)
reset0(disable_pack=False) reset0(disable_pack=False)
check(1, 2, 2, 2, 2) check(1, 2, 2, 2, 2)
@with_cluster()
def testInvalidPackTID(self, cluster):
deque(self.populate(cluster), 0)
client = cluster.client
client.wait_for_pack = True
tid = client.last_tid
self.assertRaises(NEOStorageError, client.pack, add64(tid, 1))
client.pack(tid)
t, c = cluster.getTransaction()
self.assertPopulated(c)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
...@@ -20,7 +20,7 @@ from ZODB.tests.PackableStorage import \ ...@@ -20,7 +20,7 @@ from ZODB.tests.PackableStorage import \
PackableStorageWithOptionalGC, PackableUndoStorage PackableStorageWithOptionalGC, PackableUndoStorage
from ZODB.tests.StorageTestBase import StorageTestBase from ZODB.tests.StorageTestBase import StorageTestBase
from .. import expectedFailure from .. import expectedFailure, Patch
from . import ZODBTestCase from . import ZODBTestCase
class PackableTests(ZODBTestCase, StorageTestBase, class PackableTests(ZODBTestCase, StorageTestBase,
...@@ -30,6 +30,14 @@ class PackableTests(ZODBTestCase, StorageTestBase, ...@@ -30,6 +30,14 @@ class PackableTests(ZODBTestCase, StorageTestBase,
PackableStorageWithOptionalGC.checkPackAllRevisions) PackableStorageWithOptionalGC.checkPackAllRevisions)
checkPackUndoLog = expectedFailure()(PackableUndoStorage.checkPackUndoLog) checkPackUndoLog = expectedFailure()(PackableUndoStorage.checkPackUndoLog)
def checkPackAllRevisionsNoGC(self):
def pack(orig, t, referencesf, gc):
assert referencesf is not None
assert gc is False
return orig(t)
with Patch(self._storage, pack=pack):
super(PackableTests, self).checkPackAllRevisionsNoGC()
if __name__ == "__main__": if __name__ == "__main__":
suite = unittest.makeSuite(PackableTests, 'check') suite = unittest.makeSuite(PackableTests, 'check')
unittest.main(defaultTest='suite') unittest.main(defaultTest='suite')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment