Commit 16671146 authored by Hanno Schlichting's avatar Hanno Schlichting

Merge in the essential parts of unimr's queryplan

parent 37eb25f1
......@@ -482,7 +482,7 @@ class Catalog(Persistent, Acquisition.Implicit, ExtensionClass.Base):
continue
order.append((ILimitedResultIndex.providedBy(index), name))
order.sort()
return order
return [i[1] for i in order]
def search(self, query, sort_index=None, reverse=0, limit=None, merge=1):
"""Iterate through the indexes, applying the query to each one. If
......@@ -506,17 +506,22 @@ class Catalog(Persistent, Acquisition.Implicit, ExtensionClass.Base):
# Canonicalize the request into a sensible query before passing it on
query = self.make_query(query)
cr = self.getCatalogReport(query)
cr.start()
for limit_result, i in self._sorted_search_indexes(query):
plan = cr.plan()
if not plan:
plan = self._sorted_search_indexes(query)
for i in plan:
index = self.getIndex(i)
_apply_index = getattr(index, "_apply_index", None)
if _apply_index is None:
continue
cr.split(i)
if limit_result:
cr.start_split(i)
if ILimitedResultIndex.providedBy(index):
r = _apply_index(query, rs)
else:
r = _apply_index(query)
......@@ -528,14 +533,15 @@ class Catalog(Persistent, Acquisition.Implicit, ExtensionClass.Base):
# once we don't need to support the "return everything" case
# anymore
if r is not None and not r:
cr.split(i, None)
cr.stop_split(i, None)
return LazyCat([])
cr.split(i, r)
cr.stop_split(i, r)
w, rs = weightedIntersection(rs, r)
if not rs:
break
else:
cr.split(i, None)
cr.stop_split(i, None)
cr.stop()
......
......@@ -25,6 +25,7 @@ value_indexes_lock = allocate_lock()
value_indexes = frozenset()
MAX_DISTINCT_VALUES = 10
REFRESH_RATE = 100
def determine_value_indexes(indexes):
......@@ -101,67 +102,121 @@ def make_key(catalog, query):
return key
class StopWatch(object):
""" Simple stopwatch class """
class CatalogReport(object):
"""Catalog report class to meassure and identify catalog queries.
"""
def __init__(self):
def __init__(self, catalog, query=None, threshold=0.1):
self.init()
self.catalog = catalog
self.query = query
self._key = None
self.threshold = threshold
parent = aq_parent(catalog)
path = getattr(aq_base(parent), 'getPhysicalPath', None)
if path is None:
path = ('', 'NonPersistentCatalog')
else:
path = tuple(parent.getPhysicalPath())
self.cid = path
def init(self):
self.res = []
self.start_time = None
self.interim = {}
self.stop_time = None
self.duration = None
def prioritymap(self):
# holds the benchmark of each index
prioritymap = getattr(self.catalog, '_v_prioritymap', None)
if prioritymap is None:
prioritymap = self.catalog._v_prioritymap = {}
return prioritymap
def benchmark(self):
# holds the benchmark of each index
return self.prioritymap().get(self.key(), None)
def plan(self):
benchmark = self.benchmark()
if not benchmark:
return None
# sort indexes on (mean hits, mean search time)
ranking = [((v[0], v[1]), k) for k, v in benchmark.items()]
ranking.sort()
return [i[1] for i in ranking]
def start(self):
self.init()
self.start_time = time.time()
benchmark = self.benchmark()
if benchmark is None:
self.prioritymap()[self.key()] = {}
def split(self, label, result=None):
current = time.time()
start_time, stop_time = self.interim.get(label, (None, None))
if start_time is None:
self.interim[label] = (current, None)
return
def start_split(self, label, result=None):
self.interim[label] = (time.time(), None)
def stop_split(self, name, result=None):
current = time.time()
start_time, stop_time = self.interim.get(name, (None, None))
length = 0
if result is not None:
# TODO: calculating the length can be expensive
length = len(result)
self.interim[label] = (start_time, current)
self.res.append((label, current - start_time, length))
self.interim[name] = (start_time, current)
dt = current - start_time
self.res.append((name, current - start_time, length))
# remember index's hits, search time and calls
benchmark = self.benchmark()
if name not in benchmark:
benchmark[name] = (length, dt, 1)
else:
n, t, c = benchmark[name]
n = int(((n*c) + length) / float(c + 1))
t = ((t*c) + dt) / float(c + 1)
# reset adaption
if c % REFRESH_RATE == 0:
c = 0
c += 1
benchmark[name] = (n, t, c)
def stop(self):
self.end_time = time.time()
self.duration = self.end_time - self.start_time
def result(self):
return (self.end_time - self.start_time, tuple(self.res))
class CatalogReport(StopWatch):
"""Catalog report class to meassure and identify catalog queries.
"""
key = self.key()
benchmark = self.benchmark()
prioritymap = self.prioritymap()
prioritymap[key] = benchmark
def __init__(self, catalog, query=None, threshold=0.1):
super(CatalogReport, self).__init__()
self.catalog = catalog
self.query = query
self.threshold = threshold
# calculate mean time of search
stats = getattr(self.catalog, '_v_stats', None)
if stats is None:
stats = self.catalog._v_stats = {}
parent = aq_parent(catalog)
path = getattr(aq_base(parent), 'getPhysicalPath', None)
if path is None:
path = ('', 'NonPersistentCatalog')
if key not in stats:
mt = self.duration
c = 1
else:
path = tuple(parent.getPhysicalPath())
self.cid = path
mt, c = stats[key]
mt = ((mt * c) + self.duration) / float(c + 1)
c += 1
def stop(self):
super(CatalogReport, self).stop()
stats[key] = (mt, c)
self.log()
def result(self):
return (self.duration, tuple(self.res))
def key(self):
if not self._key:
self._key = make_key(self.catalog, self.query)
return self._key
def log(self):
# result of stopwatch
res = self.result()
......@@ -171,7 +226,7 @@ class CatalogReport(StopWatch):
# The key calculation takes a bit itself, we want to avoid that for
# any fast queries. This does mean that slow queries get the key
# calculation overhead added to their runtime.
key = make_key(self.catalog, self.query)
key = self.key()
reports_lock.acquire()
try:
......
......@@ -290,19 +290,18 @@ class TestCatalog(CatalogBase, unittest.TestCase):
def test_sorted_search_indexes_one(self):
result = self._catalog._sorted_search_indexes({'att1': 'a'})
self.assertEquals(result, [(True, 'att1')])
self.assertEquals(result, ['att1'])
def test_sorted_search_indexes_many(self):
query = {'att1': 'a', 'att2': 'b', 'num': 1}
result = self._catalog._sorted_search_indexes(query)
indexes = [r[1] for r in result]
self.assertEquals(set(indexes), set(['att1', 'att2', 'num']))
self.assertEquals(set(result), set(['att1', 'att2', 'num']))
def test_sorted_search_indexes_priority(self):
# att2 and col2 don't support ILimitedResultIndex, att1 does
query = {'att1': 'a', 'att2': 'b', 'col2': 'c'}
result = self._catalog._sorted_search_indexes(query)
self.assertEquals(result.index((True, 'att1')), 2)
self.assertEquals(result.index('att1'), 2)
# search
# sortResults
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment