Commit a703a21b authored by Gregory P. Smith's avatar Gregory P. Smith

* Use weakref's of DBCursor objects for the iterator cursors to avoid a

  memory leak that would've occurred for all iterators that were
  destroyed before having iterated until they raised StopIteration.

* Simplify some code.

* Add new test cases to check for the memleak and ensure that mixing
  iteration with modification of the values for existing keys works.
parent 83c18746
...@@ -67,77 +67,76 @@ import sys ...@@ -67,77 +67,76 @@ import sys
if sys.version >= '2.3': if sys.version >= '2.3':
exec """ exec """
import UserDict import UserDict
from weakref import ref
class _iter_mixin(UserDict.DictMixin): class _iter_mixin(UserDict.DictMixin):
def _make_iter_cursor(self):
cur = self.db.cursor()
key = id(cur)
self._cursor_refs[key] = ref(cur, self._gen_cref_cleaner(key))
return cur
def _gen_cref_cleaner(self, key):
# use generate the function for the weakref callback here
# to ensure that we do not hold a strict reference to cur
# in the callback.
return lambda ref: self._cursor_refs.pop(key, None)
def __iter__(self): def __iter__(self):
try: try:
cur = self.db.cursor() cur = self._make_iter_cursor()
self._iter_cursors[str(cur)] = cur
# FIXME-20031102-greg: race condition. cursor could
# be closed by another thread before this call.
# since we're only returning keys, we call the cursor # since we're only returning keys, we call the cursor
# methods with flags=0, dlen=0, dofs=0 # methods with flags=0, dlen=0, dofs=0
curkey = cur.first(0,0,0)[0] key = cur.first(0,0,0)[0]
yield curkey yield key
next = cur.next next = cur.next
while 1: while 1:
try: try:
curkey = next(0,0,0)[0] key = next(0,0,0)[0]
yield curkey yield key
except _bsddb.DBCursorClosedError: except _bsddb.DBCursorClosedError:
# our cursor object was closed since we last yielded cur = self._make_iter_cursor()
# create a new one and attempt to reposition to the
# right place
cur = self.db.cursor()
self._iter_cursors[str(cur)] = cur
# FIXME-20031101-greg: race condition. cursor could # FIXME-20031101-greg: race condition. cursor could
# be closed by another thread before this set call. # be closed by another thread before this call.
try: cur.set(key,0,0,0)
cur.set(curkey,0,0,0)
except _bsddb.DBCursorClosedError:
# halt iteration on race condition...
raise _bsddb.DBNotFoundError
next = cur.next next = cur.next
except _bsddb.DBNotFoundError: except _bsddb.DBNotFoundError:
try: return
del self._iter_cursors[str(cur)] except _bsddb.DBCursorClosedError:
except KeyError: # the database was modified during iteration. abort.
pass
return return
def iteritems(self): def iteritems(self):
try: try:
cur = self.db.cursor() cur = self._make_iter_cursor()
self._iter_cursors[str(cur)] = cur
# FIXME-20031102-greg: race condition. cursor could
# be closed by another thread before this call.
kv = cur.first() kv = cur.first()
curkey = kv[0] key = kv[0]
yield kv yield kv
next = cur.next next = cur.next
while 1: while 1:
try: try:
kv = next() kv = next()
curkey = kv[0] key = kv[0]
yield kv yield kv
except _bsddb.DBCursorClosedError: except _bsddb.DBCursorClosedError:
# our cursor object was closed since we last yielded cur = self._make_iter_cursor()
# create a new one and attempt to reposition to the
# right place
cur = self.db.cursor()
self._iter_cursors[str(cur)] = cur
# FIXME-20031101-greg: race condition. cursor could # FIXME-20031101-greg: race condition. cursor could
# be closed by another thread before this set call. # be closed by another thread before this call.
try: cur.set(key,0,0,0)
cur.set(curkey,0,0,0)
except _bsddb.DBCursorClosedError:
# halt iteration on race condition...
raise _bsddb.DBNotFoundError
next = cur.next next = cur.next
except _bsddb.DBNotFoundError: except _bsddb.DBNotFoundError:
try: return
del self._iter_cursors[str(cur)] except _bsddb.DBCursorClosedError:
except KeyError: # the database was modified during iteration. abort.
pass
return return
""" """
else: else:
...@@ -159,7 +158,7 @@ class _DBWithCursor(_iter_mixin): ...@@ -159,7 +158,7 @@ class _DBWithCursor(_iter_mixin):
# thread while doing a put or delete in another thread. The # thread while doing a put or delete in another thread. The
# reason is that _checkCursor and _closeCursors are not atomic # reason is that _checkCursor and _closeCursors are not atomic
# operations. Doing our own locking around self.dbc, # operations. Doing our own locking around self.dbc,
# self.saved_dbc_key and self._iter_cursors could prevent this. # self.saved_dbc_key and self._cursor_refs could prevent this.
# TODO: A test case demonstrating the problem needs to be written. # TODO: A test case demonstrating the problem needs to be written.
# self.dbc is a DBCursor object used to implement the # self.dbc is a DBCursor object used to implement the
...@@ -169,15 +168,11 @@ class _DBWithCursor(_iter_mixin): ...@@ -169,15 +168,11 @@ class _DBWithCursor(_iter_mixin):
# a collection of all DBCursor objects currently allocated # a collection of all DBCursor objects currently allocated
# by the _iter_mixin interface. # by the _iter_mixin interface.
self._iter_cursors = {} self._cursor_refs = {}
def __del__(self): def __del__(self):
self.close() self.close()
def _get_dbc(self):
return self.dbc
def _checkCursor(self): def _checkCursor(self):
if self.dbc is None: if self.dbc is None:
self.dbc = self.db.cursor() self.dbc = self.db.cursor()
...@@ -197,7 +192,10 @@ class _DBWithCursor(_iter_mixin): ...@@ -197,7 +192,10 @@ class _DBWithCursor(_iter_mixin):
self.saved_dbc_key = c.current(0,0,0)[0] self.saved_dbc_key = c.current(0,0,0)[0]
c.close() c.close()
del c del c
map(lambda c: c.close(), self._iter_cursors.values()) for cref in self._cursor_refs.values():
c = cref()
if c is not None:
c.close()
def _checkOpen(self): def _checkOpen(self):
if self.db is None: if self.db is None:
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
Adapted to unittest format and expanded scope by Raymond Hettinger Adapted to unittest format and expanded scope by Raymond Hettinger
""" """
import os, sys import os, sys
import copy
import bsddb import bsddb
import dbhash # Just so we know it's imported import dbhash # Just so we know it's imported
import unittest import unittest
...@@ -64,6 +65,56 @@ class TestBSDDB(unittest.TestCase): ...@@ -64,6 +65,56 @@ class TestBSDDB(unittest.TestCase):
self.assertSetEquals(d.itervalues(), f.itervalues()) self.assertSetEquals(d.itervalues(), f.itervalues())
self.assertSetEquals(d.iteritems(), f.iteritems()) self.assertSetEquals(d.iteritems(), f.iteritems())
def test_iter_while_modifying_values(self):
if not hasattr(self.f, '__iter__'):
return
di = iter(self.d)
while 1:
try:
key = di.next()
self.d[key] = 'modified '+key
except StopIteration:
break
# it should behave the same as a dict. modifying values
# of existing keys should not break iteration. (adding
# or removing keys should)
fi = iter(self.f)
while 1:
try:
key = fi.next()
self.f[key] = 'modified '+key
except StopIteration:
break
self.test_mapping_iteration_methods()
def test_iteritems_while_modifying_values(self):
if not hasattr(self.f, 'iteritems'):
return
di = self.d.iteritems()
while 1:
try:
k, v = di.next()
self.d[k] = 'modified '+v
except StopIteration:
break
# it should behave the same as a dict. modifying values
# of existing keys should not break iteration. (adding
# or removing keys should)
fi = self.f.iteritems()
while 1:
try:
k, v = fi.next()
self.f[k] = 'modified '+v
except StopIteration:
break
self.test_mapping_iteration_methods()
def test_first_next_looping(self): def test_first_next_looping(self):
items = [self.f.first()] items = [self.f.first()]
for i in xrange(1, len(self.f)): for i in xrange(1, len(self.f)):
...@@ -111,15 +162,16 @@ class TestBSDDB(unittest.TestCase): ...@@ -111,15 +162,16 @@ class TestBSDDB(unittest.TestCase):
# the cursor's read lock will deadlock the write lock request.. # the cursor's read lock will deadlock the write lock request..
# test the iterator interface (if present) # test the iterator interface (if present)
if hasattr(self, 'iteritems'): if hasattr(self.f, 'iteritems'):
if debug: print "D" if debug: print "D"
k,v = self.f.iteritems() i = self.f.iteritems()
k,v = i.next()
if debug: print "E" if debug: print "E"
self.f[k] = "please don't deadlock" self.f[k] = "please don't deadlock"
if debug: print "F" if debug: print "F"
while 1: while 1:
try: try:
k,v = self.f.iteritems() k,v = i.next()
except StopIteration: except StopIteration:
break break
if debug: print "F2" if debug: print "F2"
...@@ -144,6 +196,27 @@ class TestBSDDB(unittest.TestCase): ...@@ -144,6 +196,27 @@ class TestBSDDB(unittest.TestCase):
self.f[k] = "be gone with ye deadlocks" self.f[k] = "be gone with ye deadlocks"
self.assert_(self.f[k], "be gone with ye deadlocks") self.assert_(self.f[k], "be gone with ye deadlocks")
def test_for_cursor_memleak(self):
if not hasattr(self.f, 'iteritems'):
return
# do the bsddb._DBWithCursor _iter_mixin internals leak cursors?
nc1 = len(self.f._cursor_refs)
# create iterator
i = self.f.iteritems()
nc2 = len(self.f._cursor_refs)
# use the iterator (should run to the first yeild, creating the cursor)
k, v = i.next()
nc3 = len(self.f._cursor_refs)
# destroy the iterator; this should cause the weakref callback
# to remove the cursor object from self.f._cursor_refs
del i
nc4 = len(self.f._cursor_refs)
self.assertEqual(nc1, nc2)
self.assertEqual(nc1, nc4)
self.assert_(nc3 == nc1+1)
def test_popitem(self): def test_popitem(self):
k, v = self.f.popitem() k, v = self.f.popitem()
self.assert_(k in self.d) self.assert_(k in self.d)
......
...@@ -84,6 +84,7 @@ ...@@ -84,6 +84,7 @@
/* --------------------------------------------------------------------- */ /* --------------------------------------------------------------------- */
#include <stddef.h> /* for offsetof() */
#include <Python.h> #include <Python.h>
#include <db.h> #include <db.h>
...@@ -92,8 +93,11 @@ ...@@ -92,8 +93,11 @@
/* 40 = 4.0, 33 = 3.3; this will break if the second number is > 9 */ /* 40 = 4.0, 33 = 3.3; this will break if the second number is > 9 */
#define DBVER (DB_VERSION_MAJOR * 10 + DB_VERSION_MINOR) #define DBVER (DB_VERSION_MAJOR * 10 + DB_VERSION_MINOR)
#if DB_VERSION_MINOR > 9
#error "eek! DBVER can't handle minor versions > 9"
#endif
#define PY_BSDDB_VERSION "4.2.3" #define PY_BSDDB_VERSION "4.2.4"
static char *rcs_id = "$Id$"; static char *rcs_id = "$Id$";
...@@ -184,6 +188,12 @@ static PyObject* DBPermissionsError; /* EPERM */ ...@@ -184,6 +188,12 @@ static PyObject* DBPermissionsError; /* EPERM */
/* --------------------------------------------------------------------- */ /* --------------------------------------------------------------------- */
/* Structure definitions */ /* Structure definitions */
#if PYTHON_API_VERSION >= 1010 /* python >= 2.1 support weak references */
#define HAVE_WEAKREF
#else
#undef HAVE_WEAKREF
#endif
struct behaviourFlags { struct behaviourFlags {
/* What is the default behaviour when DB->get or DBCursor->get returns a /* What is the default behaviour when DB->get or DBCursor->get returns a
DB_NOTFOUND error? Return None or raise an exception? */ DB_NOTFOUND error? Return None or raise an exception? */
...@@ -194,7 +204,7 @@ struct behaviourFlags { ...@@ -194,7 +204,7 @@ struct behaviourFlags {
}; };
#define DEFAULT_GET_RETURNS_NONE 1 #define DEFAULT_GET_RETURNS_NONE 1
#define DEFAULT_CURSOR_SET_RETURNS_NONE 0 /* 0 in pybsddb < 4.2, python < 2.4 */ #define DEFAULT_CURSOR_SET_RETURNS_NONE 1 /* 0 in pybsddb < 4.2, python < 2.4 */
typedef struct { typedef struct {
PyObject_HEAD PyObject_HEAD
...@@ -224,6 +234,9 @@ typedef struct { ...@@ -224,6 +234,9 @@ typedef struct {
PyObject_HEAD PyObject_HEAD
DBC* dbc; DBC* dbc;
DBObject* mydb; DBObject* mydb;
#ifdef HAVE_WEAKREF
PyObject *in_weakreflist; /* List of weak references */
#endif
} DBCursorObject; } DBCursorObject;
...@@ -760,6 +773,9 @@ newDBCursorObject(DBC* dbc, DBObject* db) ...@@ -760,6 +773,9 @@ newDBCursorObject(DBC* dbc, DBObject* db)
self->dbc = dbc; self->dbc = dbc;
self->mydb = db; self->mydb = db;
#ifdef HAVE_WEAKREF
self->in_weakreflist = NULL;
#endif
Py_INCREF(self->mydb); Py_INCREF(self->mydb);
return self; return self;
} }
...@@ -769,6 +785,13 @@ static void ...@@ -769,6 +785,13 @@ static void
DBCursor_dealloc(DBCursorObject* self) DBCursor_dealloc(DBCursorObject* self)
{ {
int err; int err;
#ifdef HAVE_WEAKREF
if (self->in_weakreflist != NULL) {
PyObject_ClearWeakRefs((PyObject *) self);
}
#endif
if (self->dbc != NULL) { if (self->dbc != NULL) {
MYDB_BEGIN_ALLOW_THREADS; MYDB_BEGIN_ALLOW_THREADS;
/* If the underlying database has been closed, we don't /* If the underlying database has been closed, we don't
...@@ -4252,6 +4275,19 @@ statichere PyTypeObject DBCursor_Type = { ...@@ -4252,6 +4275,19 @@ statichere PyTypeObject DBCursor_Type = {
0, /*tp_as_sequence*/ 0, /*tp_as_sequence*/
0, /*tp_as_mapping*/ 0, /*tp_as_mapping*/
0, /*tp_hash*/ 0, /*tp_hash*/
#ifdef HAVE_WEAKREF
0, /* tp_call */
0, /* tp_str */
0, /* tp_getattro */
0, /* tp_setattro */
0, /* tp_as_buffer */
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_WEAKREFS, /* tp_flags */
0, /* tp_doc */
0, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
offsetof(DBCursorObject, in_weakreflist), /* tp_weaklistoffset */
#endif
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment