Commit 071c6bf5 authored by Julien Muchembled's avatar Julien Muchembled

New API to iterate over non-deleted OIDs

This will be used by an external GC.

To be pushed upstream.
parent ad62c5c7
......@@ -18,6 +18,7 @@ import heapq
import random
import time
from collections import defaultdict
from functools import partial
try:
from ZODB._compat import dumps, loads, _protocol
......@@ -29,7 +30,7 @@ from ZODB.POSException import UndoError, ConflictError, ReadConflictError
from neo.lib import logging
from neo.lib.compress import decompress_list, getCompress
from neo.lib.protocol import NodeTypes, Packets, \
INVALID_PARTITION, MAX_TID, ZERO_HASH, ZERO_TID
INVALID_PARTITION, MAX_TID, ZERO_HASH, ZERO_OID, ZERO_TID
from neo.lib.util import makeChecksum, dump
from neo.lib.locking import Empty, Lock
from neo.lib.connection import MTClientConnection, ConnectionClosed
......@@ -952,6 +953,42 @@ class Application(ThreadedApplication):
append(txn_info)
return (tid, txn_list)
def oids(self, tid=None, min_oid=None, max_oid=None):
if tid is None:
tid = self.last_tid
h = []
def oids(offset, oid=min_oid or ZERO_OID):
while True:
oid, oid_list = self._askStorageForRead(offset,
Packets.AskOIDsFrom(offset, 1000, oid, tid))
i = partial(next, iter(oid_list))
try:
return [i(), i, offset, oid]
except StopIteration:
if oid is None or None is not max_oid < oid:
return
h = [x for x in map(oids, xrange(self.pt.getPartitions())) if x]
heapq.heapify(h)
heappop = partial(heapq.heappop, h)
heappushpop = partial(heapq.heappushpop, h)
while h:
x = heappop()
while True:
oid = x[0]
if None is not max_oid < oid:
return
yield oid
try:
x[0] = x[1]()
except StopIteration:
oid = x[3]
if oid is None:
break
x = oids(x[2], oid)
if not x:
break
x = heappushpop(x)
def history(self, oid, size=1, filter=None):
packet = Packets.AskObjectHistory(oid, 0, size)
result = []
......
......@@ -168,6 +168,9 @@ class StorageAnswersHandler(AnswerBaseHandler):
logging.debug('Get %u TIDs from %r', len(tid_list), conn)
self.app.setHandlerData(tid_list)
def answerOIDsFrom(self, conn, *args):
self.app.setHandlerData(args)
def answerTransactionInformation(self, conn, tid,
user, desc, ext, packed, oid_list):
self.app.setHandlerData(({
......
......@@ -26,7 +26,7 @@ except ImportError:
# The protocol version must be increased whenever upgrading a node may require
# to upgrade other nodes.
PROTOCOL_VERSION = 3
PROTOCOL_VERSION = 4
# By encoding the handshake packet with msgpack, the whole NEO stream can be
# decoded with msgpack. The first byte is 0x92, which is different from TLS
# Handshake (0x16).
......@@ -673,6 +673,13 @@ class Packets(dict):
:nodes: C -> S
""")
AskOIDsFrom, AnswerOIDsFrom = request("""
Iterate over non-deleted OIDs starting at min_oid.
The order of OIDs is ascending.
:nodes: C -> S
""")
WaitForPack, WaitedForPack = request("""
Wait until pack given by tid is completed.
......
......@@ -378,7 +378,8 @@ class ImporterDatabaseManager(DatabaseManager):
*args, **kw)
implements(self, """_getNextTID checkSerialRange checkTIDRange _pack
deleteObject deleteTransaction _dropPartition _getLastTID nonempty
getReplicationObjectList _getTIDList _setPartitionPacked""".split())
getReplicationObjectList _getTIDList _setPartitionPacked oidsFrom
""".split())
_getPartition = property(lambda self: self.db._getPartition)
_getReadablePartition = property(lambda self: self.db._getReadablePartition)
......
......@@ -1445,6 +1445,14 @@ class DatabaseManager(object):
raise NonReadableCell
return ()
@abstract
def oidsFrom(self, partition, length, min_oid, tid):
"""Return a 2-tuple where the second item is a list of non-deleted OIDs
of the specified partition, at given tid, in ascending order starting
from min_oid and at most the specified length. The first item of the
returned value is the min_oid value to use to get the next OIDs.
"""
@abstract
def getReplicationTIDList(self, min_tid, max_tid, length, partition):
"""Return a list of TIDs in ascending order from an initial tid value,
......
......@@ -52,7 +52,7 @@ else:
from . import LOG_QUERIES, DatabaseFailure
from .manager import MVCCDatabaseManager, splitOIDField
from neo.lib import logging, util
from neo.lib.exception import UndoPackError
from neo.lib.exception import NonReadableCell, UndoPackError
from neo.lib.interfaces import implements
from neo.lib.protocol import CellStates, ZERO_OID, ZERO_TID, ZERO_HASH
......@@ -976,6 +976,22 @@ class MySQLDatabaseManager(MVCCDatabaseManager):
" ORDER BY tid DESC LIMIT %d,%d"
% (','.join(map(str, partition_list)), offset, length)))
def oidsFrom(self, partition, length, min_oid, tid):
if partition not in self._readable_set:
raise NonReadableCell
p64 = util.p64
u64 = util.u64
r = self.query("SELECT oid, data_id"
" FROM obj FORCE INDEX(PRIMARY) JOIN ("
"SELECT `partition`, oid, MAX(tid) AS tid"
" FROM obj FORCE INDEX(PRIMARY)"
" WHERE `partition`=%s AND oid>=%s AND tid<=%s"
" GROUP BY oid LIMIT %s"
") AS t USING (`partition`, oid, tid)"
% (partition, u64(min_oid), u64(tid), length))
return None if len(r) < length else p64(r[-1][0] + self.np), \
[p64(oid) for oid, data_id in r if data_id is not None]
def getReplicationTIDList(self, min_tid, max_tid, length, partition):
u64 = util.u64
p64 = util.p64
......
......@@ -24,7 +24,7 @@ import traceback
from . import LOG_QUERIES
from .manager import DatabaseManager, splitOIDField
from neo.lib import logging, util
from neo.lib.exception import UndoPackError
from neo.lib.exception import NonReadableCell, UndoPackError
from neo.lib.interfaces import implements
from neo.lib.protocol import CellStates, ZERO_OID, ZERO_TID, ZERO_HASH
......@@ -710,6 +710,20 @@ class SQLiteDatabaseManager(DatabaseManager):
" ORDER BY tid DESC LIMIT %d,%d"
% (','.join(map(str, partition_list)), offset, length)))
def oidsFrom(self, partition, length, min_oid, tid):
if partition not in self._readable_set:
raise NonReadableCell
p64 = util.p64
u64 = util.u64
r = self.query("SELECT oid, data_id FROM obj JOIN ("
"SELECT `partition`, oid, MAX(tid) AS tid FROM obj"
" WHERE partition=? AND oid>=? AND tid<=?"
" GROUP BY oid LIMIT ?"
") AS t USING (`partition`, oid, tid)",
(partition, u64(min_oid), u64(tid), length)).fetchall()
return None if len(r) < length else p64(r[-1][0] + self.np), \
[p64(oid) for oid, data_id in r if data_id is not None]
def getReplicationTIDList(self, min_tid, max_tid, length, partition):
u64 = util.u64
p64 = util.p64
......
......@@ -244,6 +244,13 @@ class ClientOperationHandler(BaseHandler):
logging.info('CheckCurrentSerial delay: %.02fs', duration)
conn.answer(Packets.AnswerCheckCurrentSerial(locked))
def askOIDsFrom(self, conn, partition, length, min_oid, tid):
app = self.app
if app.tm.isLockedTid(tid):
raise DelayEvent
conn.answer(Packets.AnswerOIDsFrom(
*app.dm.oidsFrom(partition, length, min_oid, tid)))
# like ClientOperationHandler but read-only & only for tid <= backup_tid
class ClientReadOnlyOperationHandler(ClientOperationHandler):
......
......@@ -22,6 +22,7 @@ AnswerNodeList([(NodeTypes,?(bin,int),?int,NodeStates,?float)])
AnswerObject(p64,p64,?p64,?int,bin,bin,?p64)
AnswerObjectHistory(p64,[(p64,int)])
AnswerObjectUndoSerial({p64:(p64,?p64,bool)})
AnswerOIDsFrom(?p64,[p64])
AnswerPackOrders([(p64,?bool,bool,?[p64],p64)])
AnswerPartitionList(int,int,[[(int,CellStates)]])
AnswerPartitionTable(int,int,[[(int,CellStates)]])
......@@ -56,6 +57,7 @@ AskNodeList(NodeTypes)
AskObject(p64,?p64,?p64)
AskObjectHistory(p64,int,int)
AskObjectUndoSerial(p64,p64,p64,[p64])
AskOIDsFrom(int,int,p64,p64)
AskPackOrders(p64)
AskPartitionList(int,int,?)
AskPartitionTable()
......
......@@ -16,6 +16,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import random
import sys
import threading
import time
......@@ -142,6 +143,42 @@ class Test(NEOThreadedTest):
self.assertRaises(POSException.POSKeyError,
storage.load, oid, '')
@with_cluster(storage_count=3, replicas=1, partitions=5)
def testIterOIDs(self, cluster):
storage = cluster.getZODBStorage()
client = cluster.client
oids = []
for i in xrange(5):
txn = transaction.Transaction()
storage.tpc_begin(txn)
for i in xrange(7):
oid = client.new_oid()
oids.append(u64(oid))
storage.store(oid, None, '', '', txn)
client.new_oid()
storage.tpc_vote(txn)
storage.tpc_finish(txn)
tid = client.last_tid
self.assertEqual(oids, map(u64, client.oids(tid)))
def askOIDsFrom(orig, self, conn, partition, length, min_oid, tid):
return orig(self, conn, partition, 3, min_oid, tid)
with Patch(ClientOperationHandler, askOIDsFrom=askOIDsFrom):
self.assertEqual(oids[3:-3],
map(u64, client.oids(tid, p64(oids[3]), p64(oids[-4]))))
random.shuffle(oids)
while oids:
txn = transaction.Transaction()
storage.tpc_begin(txn)
for i in oids[-6:]:
oid = p64(i)
storage.deleteObject(oid, storage.load(oid)[1], txn)
storage.tpc_vote(txn)
i = storage.tpc_finish(txn)
self.assertEqual(sorted(oids), map(u64, client.oids(tid)))
del oids[-6:]
tid = i
self.assertEqual(sorted(oids), map(u64, client.oids(tid)))
def _testUndoConflict(self, cluster, *inc):
def waitResponses(orig, *args):
orig(*args)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment