Commit 5729c272 authored by Klaus Wölfel's avatar Klaus Wölfel

extend data bucket stream to index by both, ingestion-order and by key

This is needed for synchronisation with a file system. Then we need to address
each bucket by key (e.g. filename). But we also need to address by the order of
ingestion for processing with progress indicator.
parent 6fa0682a
...@@ -28,11 +28,81 @@ ...@@ -28,11 +28,81 @@
############################################################################## ##############################################################################
import hashlib import hashlib
from BTrees.OOBTree import OOBTree from BTrees.OOBTree import OOBTree
from BTrees.LOBTree import LOBTree
from AccessControl import ClassSecurityInfo from AccessControl import ClassSecurityInfo
from Products.ERP5.Document.Document import Document from Products.ERP5.Document.Document import Document
from Products.ERP5Type import Permissions, PropertySheet from Products.ERP5Type import Permissions, PropertySheet
from Products.ERP5Type.BTreeData import PersistentString from Products.ERP5Type.BTreeData import PersistentString
class IndexSequence:
"""
A Sequence base class for data bucket stream following the
BTree.IReadSequence Interface
"""
def __init__(self, data_bucket_stream, index_sequence):
self.data_bucket_stream = data_bucket_stream
self.index_sequence = index_sequence
def __getitem__(self, index):
"""Return the value at the given index.
An IndexError is raised if the index cannot be found.
"""
raise NotImplementedError
def __getslice__(self, index1, index2):
"""Return a subsequence from the original sequence.
The subsequence includes the items from index1 up to, but not
including, index2.
"""
sub_index_sequence = self.index_sequence[index1:index2]
return self.__class__(self.data_bucket_stream, sub_index_sequence)
class IndexKeySequence(IndexSequence):
"""
A Sequence class to get a value sequence for data bucket stream
"""
def __getitem__(self, index):
"""Return the value at the given index.
An IndexError is raised if the index cannot be found.
"""
bucket_index, bucket_key = self.index_sequence[index]
return (bucket_index, bucket_key)
class IndexValueSequence(IndexSequence):
"""
A Sequence class to get a value sequence for data bucket stream
"""
def __getitem__(self, index):
"""Return the value at the given index.
An IndexError is raised if the index cannot be found.
"""
bucket_key = self.index_sequence[index]
return self.data_bucket_stream.getBucketByKey(bucket_key)
class IndexItemSequence(IndexSequence):
"""
A Sequence class to get a index item sequence for data bucket stream
"""
def __getitem__(self, index):
"""Return the value at the given index.
An IndexError is raised if the index cannot be found.
"""
bucket_index, bucket_key = self.index_sequence[index]
return (bucket_index, self.data_bucket_stream.getBucketByKey(bucket_key))
class IndexKeyItemSequence(IndexSequence):
"""
A Sequence class to get a index key item sequence for data bucket stream
"""
def __getitem__(self, index):
"""Return the value at the given index.
An IndexError is raised if the index cannot be found.
"""
bucket_index, bucket_key = self.index_sequence[index]
return (bucket_index, bucket_key,
self.data_bucket_stream.getBucketByKey(bucket_key))
class DataBucketStream(Document): class DataBucketStream(Document):
""" """
Represents data stored in many small files inside a "stream". Represents data stored in many small files inside a "stream".
...@@ -53,67 +123,215 @@ class DataBucketStream(Document): ...@@ -53,67 +123,215 @@ class DataBucketStream(Document):
) )
def __init__(self, id, **kw): def __init__(self, id, **kw):
self.initTree() self.initBucketTree()
self.initIndexTree()
Document.__init__(self, id, **kw) Document.__init__(self, id, **kw)
def __len__(self): def __len__(self):
return len(self._tree) return len(self._tree)
def initTree(self): def initBucketTree(self):
""" """
Initialize the Tree Initialize the Bucket Tree
""" """
self._tree = OOBTree() self._tree = OOBTree()
def _getOb(self,id, *args, **kw): def initIndexTree(self):
"""
Initialize the Index Tree
"""
self._long_index_tree = LOBTree()
def getMaxKey(self, key=None):
"""
Return the maximum key
"""
try:
return self._tree.maxKey(key)
except ValueError:
return None
def getMaxIndex(self, index=None):
"""
Return the maximum index
"""
try:
return self._long_index_tree.maxKey(index)
except ValueError:
return None
def getMinKey(self, key=None):
"""
Return the minimum key
"""
try:
return self._tree.minKey(key)
except ValueError:
return None
def getMinIndex(self, index=None):
"""
Return the minimum key
"""
try:
return self._long_index_tree.minKey(index)
except ValueError:
return None
def _getOb(self, id, *args, **kw):
return None return None
def getBucket(self, key): def getBucketByKey(self, key=None):
""" """
Get one bucket Get one bucket
""" """
return self._tree[key].value return self._tree[key].value
def getBucketByIndex(self, index=None):
"""
Get one bucket
"""
key = self._long_index_tree[index]
return self.getBucketByKey(key).value
def hasBucketKey(self, key):
"""
Wether bucket with such key exists
"""
return self._tree.has_key(key)
def hasBucketIndex(self, index):
"""
Wether bucket with such index exists
"""
return self._long_index_tree.has_key(index)
def insertBucket(self, key, value): def insertBucket(self, key, value):
""" """
Insert one bucket Insert one bucket
""" """
try:
count = self._long_index_tree.maxKey() + 1
except ValueError:
count = 0
except AttributeError:
pass
try:
self._long_index_tree.insert(count, key)
except AttributeError:
pass
return self._tree.insert(key, PersistentString(value)) return self._tree.insert(key, PersistentString(value))
def popBucket(self, key): def getBucketKeySequenceByKey(self, start_key=None, stop_key=None,
count=None, exclude_start_key=False, exclude_stop_key=False):
""" """
Remove one Bucket Get a lazy sequence of bucket keys
""" """
return self._tree.pop(key) sequence = self._tree.keys(min=start_key, max=stop_key,
excludemin=exclude_start_key,
excludemax=exclude_stop_key)
if count is None:
return sequence
return sequence[:count]
def getBucketKeySequence(self, start_key=None, count=None): def getBucketKeySequenceByIndex(self, start_index=None, stop_index=None,
count=None, exclude_start_index=False, exclude_stop_index=False):
""" """
Get a lazy sequence of bucket values Get a lazy sequence of bucket keys
"""
sequence = self._long_index_tree.values(min=start_index, max=stop_index,
excludemin=exclude_start_index,
excludemax=exclude_stop_index)
if count is None:
return sequence
return sequence[:count]
def getBucketIndexKeySequenceByIndex(self, start_index=None, stop_index=None,
count=None, exclude_start_index=False, exclude_stop_index=False):
"""
Get a lazy sequence of bucket keys
""" """
sequence = self._tree.keys(min=start_key) sequence = self._long_index_tree.items(min=start_index, max=stop_index,
excludemin=exclude_start_index,
excludemax=exclude_stop_index)
if count is not None:
sequence = sequence[:count]
return IndexKeySequence(self, sequence)
def getBucketIndexSequenceByIndex(self, start_index=None, stop_index=None,
count=None, exclude_start_index=False, exclude_stop_index=False):
"""
Get a lazy sequence of bucket keys
"""
sequence = self._long_index_tree.keys(min=start_index, max=stop_index,
excludemin=exclude_start_index,
excludemax=exclude_stop_index)
if count is None: if count is None:
return sequence return sequence
return sequence[:count] return sequence[:count]
def getBucketValueSequence(self, start_key=None, count=None): def getBucketValueSequenceByKey(self, start_key=None, stop_key=None,
count=None, exclude_start_key=False, exclude_stop_key=False):
""" """
Get a lazy sequence of bucket values Get a lazy sequence of bucket values
""" """
sequence = self._tree.values(min=start_key) sequence = self._tree.values(min=start_key, max=stop_key,
excludemin=exclude_start_key,
excludemax=exclude_stop_key)
if count is None: if count is None:
return sequence return sequence
return sequence[:count] return sequence[:count]
def getBucketItemSequence(self, start_key=None, count=None, def getBucketValueSequenceByIndex(self, start_index=None, stop_index=None,
exclude_start_key=False): count=None, exclude_start_index=False, exclude_stop_index=False):
"""
Get a lazy sequence of bucket values
"""
sequence = self._long_index_tree.values(min=start_index, max=stop_index,
excludemin=exclude_start_index,
excludemax=exclude_stop_index)
if count is not None:
sequence = sequence[:count]
return IndexValueSequence(self, sequence)
def getBucketKeyItemSequenceByKey(self, start_key=None, stop_key=None,
count=None, exclude_start_key=False, exclude_stop_key=False):
""" """
Get a lazy sequence of bucket items Get a lazy sequence of bucket items
""" """
sequence = self._tree.items(min=start_key, excludemin=exclude_start_key) sequence = self._tree.items(min=start_key, max=stop_key,
excludemin=exclude_start_key,
excludemax=exclude_stop_key)
if count is None: if count is None:
return sequence return sequence
return sequence[:count] return sequence[:count]
def getBucketIndexItemSequenceByIndex(self, start_index=None, stop_index=None,
count=None, exclude_start_index=False, exclude_stop_index=False):
"""
Get a lazy sequence of bucket items
"""
sequence = self._long_index_tree.items(min=start_index, max=stop_index,
excludemin=exclude_start_index,
excludemax=exclude_stop_index)
if count is not None:
sequence = sequence[:count]
return IndexItemSequence(self, sequence)
def getBucketIndexKeyItemSequenceByIndex(self, start_index=None,
stop_index=None, count=None,
exclude_start_index=False,
exclude_stop_index=False):
"""
Get a lazy sequence of bucket items
"""
sequence = self._long_index_tree.items(min=start_index, max=stop_index,
excludemin=exclude_start_index,
excludemax=exclude_stop_index)
if count is not None:
sequence = sequence[:count]
return IndexKeyItemSequence(self, sequence)
def getItemList(self): def getItemList(self):
""" """
Return a list of all key, value pairs Return a list of all key, value pairs
...@@ -126,10 +344,22 @@ class DataBucketStream(Document): ...@@ -126,10 +344,22 @@ class DataBucketStream(Document):
""" """
return [key for key in self._tree.keys()] return [key for key in self._tree.keys()]
def getIndexList(self):
"""
Return a list of all indexes
"""
return [key for key in self._long_index_tree.keys()]
def getIndexKeyTupleList(self):
"""
Return a list of all indexes
"""
return [key for key in self._long_index_tree.items()]
def getMd5sum(self, key): def getMd5sum(self, key):
""" """
Get hexdigest of bucket. Get hexdigest of bucket.
""" """
h = hashlib.md5() h = hashlib.md5()
h.update(self.getBucket(key)) h.update(self.getBucketByKey(key))
return h.hexdigest() return h.hexdigest()
...@@ -46,8 +46,8 @@ ...@@ -46,8 +46,8 @@
<key> <string>text_content_warning_message</string> </key> <key> <string>text_content_warning_message</string> </key>
<value> <value>
<tuple> <tuple>
<string>W: 54, 21: Redefining built-in \'id\' (redefined-builtin)</string> <string>W:124, 21: Redefining built-in \'id\' (redefined-builtin)</string>
<string>W: 67, 18: Redefining built-in \'id\' (redefined-builtin)</string> <string>W:180, 19: Redefining built-in \'id\' (redefined-builtin)</string>
</tuple> </tuple>
</value> </value>
</item> </item>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment