Commit eeb96fa3 authored by Casey Duncan's avatar Casey Duncan

Merge casey-zctextindex-fewer-conflicts-branch:

  - Indexes and Lexicon now much less likely to generate write conflicts.
    Previously *any* concurrent index/unindex operation would conflict

  - Performance and scalability fix for queries
parent 879240f9
......@@ -20,7 +20,7 @@ import math
from BTrees.IOBTree import IOBTree
from BTrees.IIBTree import IIBTree, IIBucket, IITreeSet
from BTrees.IIBTree import intersection, difference
import BTrees.Length
from BTrees.Length import Length
from Products.ZCTextIndex.IIndex import IIndex
from Products.ZCTextIndex import WidCode
......@@ -83,12 +83,18 @@ class BaseIndex(Persistent):
self._docwords = IOBTree()
# Use a BTree length for efficient length computation w/o conflicts
self.length = BTrees.Length.Length()
self.length = Length()
self.document_count = Length()
def length(self):
"""Return the number of words in the index."""
# This is overridden per instance
return len(self._wordinfo)
def document_count(self):
"""Return the number of documents in the index"""
# This is overridden per instance
return len(self._docweight)
def get_words(self, docid):
"""Return a list of the wordids for a given docid."""
......@@ -104,6 +110,11 @@ class BaseIndex(Persistent):
self._mass_add_wordinfo(wid2weight, docid)
self._docweight[docid] = docweight
self._docwords[docid] = WidCode.encode(wids)
try:
self.document_count.change(1)
except AttributeError:
# Upgrade document_count to Length object
self.document_count = Length(self.document_count())
return len(wids)
# A subclass may wish to extend or override this. This is for adjusting
......@@ -165,6 +176,11 @@ class BaseIndex(Persistent):
self._del_wordinfo(wid, docid)
del self._docwords[docid]
del self._docweight[docid]
try:
self.document_count.change(-1)
except AttributeError:
# Upgrade document_count to Length object
self.document_count = Length(self.document_count())
def search(self, term):
wids = self._lexicon.termToWordIds(term)
......
......@@ -69,7 +69,7 @@ class CosineIndex(BaseIndex):
def _search_wids(self, wids):
if not wids:
return []
N = float(len(self._docweight))
N = float(self.document_count())
L = []
DictType = type({})
for wid in wids:
......@@ -86,7 +86,7 @@ class CosineIndex(BaseIndex):
wids = []
for term in terms:
wids += self._lexicon.termToWordIds(term)
N = float(len(self._docweight))
N = float(self.document_count())
sum = 0.0
for wid in self._remove_oov_wids(wids):
wt = inverse_doc_frequency(len(self._wordinfo[wid]), N)
......
......@@ -20,6 +20,9 @@ class IIndex(Interface.Base):
"""Interface for an Index."""
def length():
"""Return the number of words in the index."""
def document_count():
"""Return the number of documents in the index."""
def get_words(docid):
......@@ -62,10 +65,13 @@ class IIndex(Interface.Base):
"""
def index_doc(docid, text):
"XXX"
"""Add a document with the specified id and text to the index. If a
document by that id already exists, replace its text with the new
text provided
"""
def unindex_doc(docid):
"XXX"
"""Remove the document with the specified id from the index"""
def has_doc(docid):
"""Returns true if docid is an id of a document in the index"""
......@@ -16,6 +16,7 @@ import re
from BTrees.IOBTree import IOBTree
from BTrees.OIBTree import OIBTree
from BTrees.Length import Length
import ZODB
from Persistence import Persistent
......@@ -37,16 +38,13 @@ class Lexicon(Persistent):
# we never saw before, and that isn't a known stopword (or otherwise
# filtered out). Returning a special wid value for OOV words is a
# way to let clients know when an OOV word appears.
self._nextwid = 1
self.length = Length()
self._pipeline = pipeline
# Keep some statistics about indexing
self._nbytes = 0 # Number of bytes indexed (at start of pipeline)
self._nwords = 0 # Number of words indexed (after pipeline)
def length(self):
"""Return the number of unique terms in the lexicon."""
return self._nextwid - 1
# Overridden in instances
return len(self._wids)
def words(self):
return self._wids.keys()
......@@ -59,11 +57,15 @@ class Lexicon(Persistent):
def sourceToWordIds(self, text):
last = _text2list(text)
for t in last:
self._nbytes += len(t)
for element in self._pipeline:
last = element.process(last)
self._nwords += len(last)
if not hasattr(self.length, 'change'):
# Make sure length is overridden with a BTrees.Length.Length
self.length = Length(self.length())
# Strategically unload the length value so that we get the most
# recent value written to the database to minimize conflicting wids
# XXX this will not work when MVCC is implemented in the ZODB...
self.length._p_deactivate()
return map(self._getWordIdCreate, last)
def termToWordIds(self, text):
......@@ -138,9 +140,10 @@ class Lexicon(Persistent):
return wid
def _new_wid(self):
wid = self._nextwid
self._nextwid += 1
return wid
self.length.change(1)
while self._words.has_key(self.length()): # just to be safe
self.length.change(1)
return self.length()
def _text2list(text):
# Helper: splitter input may be a string or a list of strings
......
......@@ -18,6 +18,7 @@
# understand what's going on.
from BTrees.IIBTree import IIBucket
from BTrees.Length import Length
from Products.ZCTextIndex.IIndex import IIndex
from Products.ZCTextIndex.BaseIndex import BaseIndex, \
......@@ -50,20 +51,29 @@ class OkapiIndex(BaseIndex):
# sum(self._docweight.values()), the total # of words in all docs
# This is a long for "better safe than sorry" reasons. It isn't
# used often enough that speed should matter.
self._totaldoclen = 0L
# Use a BTree.Length.Length object to avoid concurrent write conflicts
self._totaldoclen = Length(0L)
def index_doc(self, docid, text):
count = BaseIndex.index_doc(self, docid, text)
self._totaldoclen += count
self._change_doc_len(count)
return count
def _reindex_doc(self, docid, text):
self._totaldoclen -= self._docweight[docid]
self._change_doc_len(-self._docweight[docid])
return BaseIndex._reindex_doc(self, docid, text)
def unindex_doc(self, docid):
self._totaldoclen -= self._docweight[docid]
self._change_doc_len(-self._docweight[docid])
BaseIndex.unindex_doc(self, docid)
def _change_doc_len(self, delta):
# Change total doc length used for scoring
try:
self._totaldoclen.change(delta)
except AttributeError:
# Opportunistically upgrade _totaldoclen attribute to Length object
self._totaldoclen = Length(long(self._totaldoclen + delta))
# The workhorse. Return a list of (IIBucket, weight) pairs, one pair
# for each wid t in wids. The IIBucket, times the weight, maps D to
......@@ -76,8 +86,13 @@ class OkapiIndex(BaseIndex):
def _search_wids(self, wids):
if not wids:
return []
N = float(len(self._docweight)) # total # of docs
meandoclen = self._totaldoclen / N
N = float(self.document_count()) # total # of docs
try:
doclen = self._totaldoclen()
except TypeError:
# _totaldoclen has not yet been upgraded
doclen = self._totaldoclen
meandoclen = doclen / N
K1 = self.K1
B = self.B
K1_plus1 = K1 + 1.0
......@@ -120,8 +135,13 @@ class OkapiIndex(BaseIndex):
def _search_wids(self, wids):
if not wids:
return []
N = float(len(self._docweight)) # total # of docs
meandoclen = self._totaldoclen / N
N = float(self.document_count()) # total # of docs
try:
doclen = self._totaldoclen()
except TypeError:
# _totaldoclen has not yet been upgraded
doclen = self._totaldoclen
meandoclen = doclen / N
#K1 = self.K1
#B = self.B
#K1_plus1 = K1 + 1.0
......
......@@ -173,13 +173,11 @@ class ZCTextIndex(Persistent, Acquisition.Implicit, SimpleItem):
if text is None:
return 0
count = self.index.index_doc(docid, text)
self._p_changed = 1 # XXX
return count
def unindex_object(self, docid):
if self.index.has_doc(docid):
self.index.unindex_doc(docid)
self._p_changed = 1 # XXX
def _apply_index(self, request, cid=''):
"""Apply query specified by request, a mapping containing the query.
......
......@@ -441,8 +441,6 @@ class Indexer:
self.updatefolder(f, f.listmessages())
print "Total", len(self.docpaths)
self.commit()
print "Indexed", self.index.lexicon._nbytes, "bytes and",
print self.index.lexicon._nwords, "words;",
print len(self.index.lexicon._words), "unique words."
def updatefolder(self, f, msgs):
......
......@@ -12,8 +12,10 @@
#
##############################################################################
import os
from unittest import TestCase, TestSuite, main, makeSuite
from BTrees.Length import Length
from Products.ZCTextIndex.Lexicon import Lexicon, Splitter
from Products.ZCTextIndex.CosineIndex import CosineIndex
from Products.ZCTextIndex.OkapiIndex import OkapiIndex
......@@ -34,6 +36,8 @@ class IndexTest(TestCase):
self.assert_(self.index.has_doc(DOCID))
self.assert_(self.index._docweight[DOCID])
self.assertEqual(len(self.index._docweight), 1)
self.assertEqual(
len(self.index._docweight), self.index.document_count())
self.assertEqual(len(self.index._wordinfo), 5)
self.assertEqual(len(self.index._docwords), 1)
self.assertEqual(len(self.index.get_words(DOCID)), 5)
......@@ -48,6 +52,8 @@ class IndexTest(TestCase):
self.test_index_document(DOCID)
self.index.unindex_doc(DOCID)
self.assertEqual(len(self.index._docweight), 0)
self.assertEqual(
len(self.index._docweight), self.index.document_count())
self.assertEqual(len(self.index._wordinfo), 0)
self.assertEqual(len(self.index._docwords), 0)
self.assertEqual(len(self.index._wordinfo),
......@@ -60,6 +66,8 @@ class IndexTest(TestCase):
self.index.index_doc(DOCID, doc)
self.assert_(self.index._docweight[DOCID])
self.assertEqual(len(self.index._docweight), 2)
self.assertEqual(
len(self.index._docweight), self.index.document_count())
self.assertEqual(len(self.index._wordinfo), 8)
self.assertEqual(len(self.index._docwords), 2)
self.assertEqual(len(self.index.get_words(DOCID)), 4)
......@@ -82,6 +90,8 @@ class IndexTest(TestCase):
self.index.unindex_doc(1)
DOCID = 2
self.assertEqual(len(self.index._docweight), 1)
self.assertEqual(
len(self.index._docweight), self.index.document_count())
self.assert_(self.index._docweight[DOCID])
self.assertEqual(len(self.index._wordinfo), 4)
self.assertEqual(len(self.index._docwords), 1)
......@@ -101,6 +111,8 @@ class IndexTest(TestCase):
self.assertEqual(len(self.index.get_words(DOCID)), 7)
self.assertEqual(len(self.index._wordinfo),
self.index.length())
self.assertEqual(
len(self.index._docweight), self.index.document_count())
wids = self.lexicon.termToWordIds("repeat")
self.assertEqual(len(wids), 1)
repititive_wid = wids[0]
......@@ -145,9 +157,130 @@ class CosineIndexTest(IndexTest):
class OkapiIndexTest(IndexTest):
IndexFactory = OkapiIndex
class TestIndexConflict(TestCase):
storage = None
def tearDown(self):
if self.storage is not None:
self.storage.close()
def openDB(self):
from ZODB.FileStorage import FileStorage
from ZODB.DB import DB
n = 'fs_tmp__%s' % os.getpid()
self.storage = FileStorage(n)
self.db = DB(self.storage)
def test_index_doc_conflict(self):
self.index = OkapiIndex(Lexicon())
self.openDB()
r1 = self.db.open().root()
r1['i'] = self.index
get_transaction().commit()
r2 = self.db.open().root()
copy = r2['i']
# Make sure the data is loaded
list(copy._docweight.items())
list(copy._docwords.items())
list(copy._wordinfo.items())
list(copy._lexicon._wids.items())
list(copy._lexicon._words.items())
self.assertEqual(self.index._p_serial, copy._p_serial)
self.index.index_doc(0, 'The time has come')
get_transaction().commit()
copy.index_doc(1, 'That time has gone')
get_transaction().commit()
def test_reindex_doc_conflict(self):
self.index = OkapiIndex(Lexicon())
self.index.index_doc(0, 'Sometimes change is good')
self.index.index_doc(1, 'Then again, who asked')
self.openDB()
r1 = self.db.open().root()
r1['i'] = self.index
get_transaction().commit()
r2 = self.db.open().root()
copy = r2['i']
# Make sure the data is loaded
list(copy._docweight.items())
list(copy._docwords.items())
list(copy._wordinfo.items())
list(copy._lexicon._wids.items())
list(copy._lexicon._words.items())
self.assertEqual(self.index._p_serial, copy._p_serial)
self.index.index_doc(0, 'Sometimes change isn\'t bad')
get_transaction().commit()
copy.index_doc(1, 'Then again, who asked you?')
get_transaction().commit()
class TestUpgrade(TestCase):
def test_query_before_totaldoclen_upgrade(self):
self.index1 = OkapiIndex(Lexicon(Splitter()))
self.index1.index_doc(0, 'The quiet of night')
# Revert index1 back to a long to simulate an older index instance
self.index1._totaldoclen = long(self.index1._totaldoclen())
self.assertEqual(len(self.index1.search('night')), 1)
def test_upgrade_totaldoclen(self):
self.index1 = OkapiIndex(Lexicon())
self.index2 = OkapiIndex(Lexicon())
self.index1.index_doc(0, 'The quiet of night')
self.index2.index_doc(0, 'The quiet of night')
# Revert index1 back to a long to simulate an older index instance
self.index1._totaldoclen = long(self.index1._totaldoclen())
self.index1.index_doc(1, 'gazes upon my shadow')
self.index2.index_doc(1, 'gazes upon my shadow')
self.assertEqual(
self.index1._totaldoclen(), self.index2._totaldoclen())
self.index1._totaldoclen = long(self.index1._totaldoclen())
self.index1.unindex_doc(0)
self.index2.unindex_doc(0)
self.assertEqual(
self.index1._totaldoclen(), self.index2._totaldoclen())
def test_query_before_document_count_upgrade(self):
self.index1 = OkapiIndex(Lexicon(Splitter()))
self.index1.index_doc(0, 'The quiet of night')
# Revert index1 back to a long to simulate an older index instance
del self.index1.document_count
self.assertEqual(len(self.index1.search('night')), 1)
def test_upgrade_document_count(self):
self.index1 = OkapiIndex(Lexicon())
self.index2 = OkapiIndex(Lexicon())
self.index1.index_doc(0, 'The quiet of night')
self.index2.index_doc(0, 'The quiet of night')
# Revert index1 back to simulate an older index instance
del self.index1.document_count
self.index1.index_doc(1, 'gazes upon my shadow')
self.index2.index_doc(1, 'gazes upon my shadow')
self.assert_(self.index1.document_count.__class__ is Length)
self.assertEqual(
self.index1.document_count(), self.index2.document_count())
del self.index1.document_count
self.index1.unindex_doc(0)
self.index2.unindex_doc(0)
self.assert_(self.index1.document_count.__class__ is Length)
self.assertEqual(
self.index1.document_count(), self.index2.document_count())
def test_suite():
return TestSuite((makeSuite(CosineIndexTest),
makeSuite(OkapiIndexTest),
makeSuite(TestIndexConflict),
makeSuite(TestUpgrade),
))
if __name__=='__main__':
......
......@@ -12,9 +12,11 @@
#
##############################################################################
import sys
import os, sys
from unittest import TestCase, TestSuite, main, makeSuite
import ZODB
from Products.ZCTextIndex.Lexicon import Lexicon
from Products.ZCTextIndex.Lexicon import Splitter, CaseNormalizer
......@@ -134,9 +136,59 @@ class Test(TestCase):
words = HTMLWordSplitter().process(words)
self.assertEqual(words, expected)
locale.setlocale(locale.LC_ALL, loc) # restore saved locale
def testUpgradeLength(self):
from BTrees.Length import Length
lexicon = Lexicon(Splitter())
del lexicon.length # Older instances don't override length
lexicon.sourceToWordIds('how now brown cow')
self.assert_(lexicon.length.__class__ is Length)
class TestLexiconConflict(TestCase):
storage = None
def tearDown(self):
if self.storage is not None:
self.storage.close()
def openDB(self):
from ZODB.FileStorage import FileStorage
from ZODB.DB import DB
n = 'fs_tmp__%s' % os.getpid()
self.storage = FileStorage(n)
self.db = DB(self.storage)
def testAddWordConflict(self):
self.l = Lexicon(Splitter())
self.openDB()
r1 = self.db.open().root()
r1['l'] = self.l
get_transaction().commit()
r2 = self.db.open().root()
copy = r2['l']
# Make sure the data is loaded
list(copy._wids.items())
list(copy._words.items())
copy.length()
self.assertEqual(self.l._p_serial, copy._p_serial)
self.l.sourceToWordIds('mary had a little lamb')
get_transaction().commit()
copy.sourceToWordIds('whose fleece was')
copy.sourceToWordIds('white as snow')
get_transaction().commit()
self.assertEqual(copy.length(), 11)
self.assertEqual(copy.length(), len(copy._words))
def test_suite():
return makeSuite(Test)
suite = TestSuite()
suite.addTest(makeSuite(Test))
suite.addTest(makeSuite(TestLexiconConflict))
return suite
if __name__=='__main__':
main(defaultTest='test_suite')
......@@ -331,7 +331,7 @@ class OkapiIndexTests(ZCIndexTestsBase, testIndex.OkapiIndexTest):
self._checkAbsoluteScores()
def _checkAbsoluteScores(self):
self.assertEqual(self.index._totaldoclen, 6)
self.assertEqual(self.index._totaldoclen(), 6)
# So the mean doc length is 2. We use that later.
r, num = self.zc_index.query("one")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment