Commit 441145e5 authored by Julien Muchembled's avatar Julien Muchembled

client: simplify iterator

parent bd03b14b
......@@ -134,12 +134,9 @@ class Storage(BaseStorage.BaseStorage,
except NEOStorageNotFoundError:
return None
def iterator(self, start=None, stop=None):
# Iterator lives in its own transaction, so get a fresh snapshot.
snapshot_tid = self.lastTransaction()
if not stop or snapshot_tid < stop:
stop = snapshot_tid
return self.app.iterator(start, stop)
@property
def iterator(self):
return self.app.iterator
# undo
def undo(self, transaction_id, txn):
......
......@@ -41,7 +41,6 @@ from .exception import NEOStorageNotFoundError
from .handlers import storage, master
from neo.lib.dispatcher import Dispatcher, ForgottenPacket
from .poll import ThreadedPoll, psThreadedPoll
from .iterator import Iterator
from .cache import ClientCache
from .pool import ConnectionPool
from neo.lib.util import p64, u64, parseMasterList
......@@ -939,10 +938,7 @@ class Application(object):
assert real_tid == tid, (real_tid, tid)
transaction_iter.close()
def iterator(self, start, stop):
if start is None:
start = ZERO_TID
return Iterator(self, start, stop)
from .iterator import iterator
def lastTransaction(self):
self._askPrimary(Packets.AskLastTransaction())
......
......@@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ZODB import BaseStorage
from neo.lib.protocol import ZERO_TID, MAX_TID
from neo.lib.util import u64, add64
from .exception import NEOStorageCreationUndoneError, NEOStorageNotFoundError
......@@ -23,9 +24,6 @@ CHUNK_LENGTH = 100
class Record(BaseStorage.DataRecord):
""" BaseStorage Transaction record yielded by the Transaction object """
def __init__(self, oid, tid, data, prev):
BaseStorage.DataRecord.__init__(self, oid, tid, data, prev)
def __str__(self):
oid = u64(self.oid)
tid = u64(self.tid)
......@@ -36,109 +34,49 @@ class Record(BaseStorage.DataRecord):
class Transaction(BaseStorage.TransactionRecord):
""" Transaction object yielded by the NEO iterator """
def __init__(self, app, tid, status, user, desc, ext, oid_list,
prev_serial_dict):
BaseStorage.TransactionRecord.__init__( self, tid, status, user, desc,
ext)
def __init__(self, app, txn, prev_serial_dict):
super(Transaction, self).__init__(txn['id'], ' ',
txn['user_name'], txn['description'], txn['ext'])
self.app = app
self.oid_list = oid_list
self.oid_index = 0
self.history = []
self.oid_list = txn['oids']
self.prev_serial_dict = prev_serial_dict
def __iter__(self):
return self
def next(self):
""" Iterate over the transaction records """
app = self.app
oid_list = self.oid_list
oid_index = self.oid_index
oid_len = len(oid_list)
# load an object
while oid_index < oid_len:
oid = oid_list[oid_index]
load = self.app.load
for oid in self.oid_list:
try:
data, _, next_tid = app.load(oid, self.tid)
data, _, next_tid = load(oid, self.tid)
except NEOStorageCreationUndoneError:
data = next_tid = None
except NEOStorageNotFoundError:
# Transactions are not updated after a pack, so their object
# will not be found in the database. Skip them.
oid_list.pop(oid_index)
oid_len -= 1
continue
oid_index += 1
break
else:
# no more records for this transaction
self.oid_index = 0
raise StopIteration
self.oid_index = oid_index
record = Record(oid, self.tid, data,
self.prev_serial_dict.get(oid))
if next_tid is None:
self.prev_serial_dict.pop(oid, None)
else:
self.prev_serial_dict[oid] = self.tid
return record
def __str__(self):
tid = u64(self.tid)
args = (tid, self.user, self.status)
return 'Transaction #%s: %s %s' % args
class Iterator(object):
""" An iterator for the NEO storage """
def __init__(self, app, start, stop):
self.app = app
self._txn_list = []
assert None not in (start, stop)
self._start = start
self._stop = stop
# index of current iteration
self._index = 0
self._closed = False
# OID -> previous TID mapping
# TODO: prune old entries while walking ?
self._prev_serial_dict = {}
def __iter__(self):
return self
def __getitem__(self, index):
""" Simple index-based iterator """
if index != self._index:
raise IndexError, index
return self.next()
def next(self):
""" Return an iterator for the next transaction"""
if self._closed:
raise IOError, 'iterator closed'
if not self._txn_list:
(max_tid, chunk) = self.app.transactionLog(self._start, self._stop,
CHUNK_LENGTH)
if not chunk:
# nothing more
raise StopIteration
self._start = add64(max_tid, 1)
self._txn_list = chunk
txn = self._txn_list.pop(0)
self._index += 1
tid = txn['id']
user = txn['user_name']
desc = txn['description']
oid_list = txn['oids']
extension = txn['ext']
txn = Transaction(self.app, tid, ' ', user, desc, extension, oid_list,
self._prev_serial_dict)
return txn
if next_tid is None:
prev_tid = self.prev_serial_dict.pop(oid, None)
else:
prev_tid = self.prev_serial_dict.get(oid)
self.prev_serial_dict[oid] = self.tid
yield Record(oid, self.tid, data, prev_tid)
def __str__(self):
return 'NEO transactions iterator'
def close(self):
self._closed = True
return 'Transaction #%s: %s %s' \
% (u64(self.tid), self.user, self.status)
def iterator(app, start=None, stop=None):
"""NEO transaction iterator"""
if start is None:
start = ZERO_TID
stop = min(stop or MAX_TID, app.lastTransaction())
# OID -> previous TID mapping
# TODO: prune old entries while walking ?
prev_serial_dict = {}
while 1:
max_tid, chunk = app.transactionLog(start, stop, CHUNK_LENGTH)
if not chunk:
break # nothing more
for txn in chunk:
yield Transaction(app, txn, prev_serial_dict)
start = add64(max_tid, 1)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment