Commit 49962b18 authored by Julien Muchembled's avatar Julien Muchembled

tests: extend bdb/rpdb2 debuggers to help debugging a set of processes

This adds a dependency to 'psutil', which is required anyway to fix
NEOProcess.isAlive

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2725 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 11a323bd
......@@ -24,7 +24,6 @@ eggs =
neostorage
neoclient
neomaster
# for unit tests
zope.testing
psutil
......@@ -16,7 +16,6 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import __builtin__
import errno
import os
import random
import socket
......@@ -30,9 +29,8 @@ from mock import Mock
from neo.lib import debug, logger, protocol
from neo.lib.protocol import Packets
from neo.lib.util import getAddressType
from time import time, gmtime, sleep
from time import time, gmtime
from struct import pack, unpack
from functools import wraps
DB_PREFIX = os.getenv('NEO_DB_PREFIX', 'test_neo_')
DB_ADMIN = os.getenv('NEO_DB_ADMIN', 'root')
......@@ -507,93 +505,5 @@ class DoNothingConnector(Mock):
def getDescriptor(self):
return self.desc
class SocketLock(object):
"""Basic system-wide lock"""
_socket = None
def __init__(self, address, family=socket.AF_UNIX, type=socket.SOCK_DGRAM):
if family == socket.AF_UNIX:
address = '\0' + address
self.address = address
self.socket_args = family, type
def locked(self):
return self._socket is not None
def acquire(self, blocking=1):
assert self._socket is None
s = socket.socket(*self.socket_args)
try:
while True:
try:
s.bind(self.address)
except socket.error, e:
if e[0] != errno.EADDRINUSE:
raise
if not blocking:
return False
sleep(1)
else:
self._socket = s
return True
finally:
if self._socket is None:
s.close()
def release(self):
s = self._socket
del self._socket
s.close()
class ClusterPdb(object):
# TODO: monkey-patch normal code not to timeout
# if another node is being debugged
def __init__(self):
self._r, self._w = os.pipe()
self.release(0)
def __getattr__(self, attr):
try:
debugger = self.__dict__['_debugger']
except KeyError:
self._debugger = debugger = debug.getPdb()
def hook(name):
hook = getattr(self, name)
hooked = getattr(debugger, name)
def wrapper(*args, **kw):
return hook(hooked, *args, **kw)
setattr(debugger, name, wraps(hooked)(wrapper))
hook('interaction')
return getattr(debugger, attr)
def acquire(self):
return unpack('d', os.read(self._r, 8))[0]
def release(self, delay):
os.write(self._w, pack('d', delay))
def sync(self):
"""Sleep as long as another process owns the lock"""
delay = self.acquire()
self.release(delay)
return delay
def interaction(self, hooked, *args, **kw):
delay = self.acquire() - time()
try:
return hooked(*args, **kw)
finally:
self.release(delay + time())
def wait(self, test, timeout, period):
end_time = time() + timeout
while not test():
if time() > end_time + self.sync():
return False
sleep(period)
return True
__builtin__.pdb = ClusterPdb()
__builtin__.pdb = lambda: debug.getPdb().set_trace(sys._getframe(1))
#
# Copyright (c) 2011 Nexedi SARL and Contributors. All Rights Reserved.
# Julien Muchembled <jm@nexedi.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import __builtin__
import errno
import mmap
import os
import psutil
import signal
import socket
import sys
import tempfile
from cPickle import dumps, loads
from functools import wraps
from time import time, sleep
from neo.lib import debug
class SocketLock(object):
"""Basic system-wide lock"""
_socket = None
def __init__(self, address, family=socket.AF_UNIX, type=socket.SOCK_DGRAM):
if family == socket.AF_UNIX:
address = '\0' + address
self.address = address
self.socket_args = family, type
def locked(self):
return self._socket is not None
def acquire(self, blocking=1):
assert self._socket is None
s = socket.socket(*self.socket_args)
try:
while True:
try:
s.bind(self.address)
except socket.error, e:
if e[0] != errno.EADDRINUSE:
raise
if not blocking:
return False
sleep(1)
else:
self._socket = s
return True
finally:
if self._socket is None:
s.close()
def release(self):
s = self._socket
del self._socket
s.close()
class ClusterDict(dict):
"""Simple storage (dict), shared with forked processes"""
_acquired = 0
def __init__(self, *args, **kw):
dict.__init__(self, *args, **kw)
self._r, self._w = os.pipe()
# shm_open(3) would be better but Python doesn't provide it.
# See also http://nikitathespider.com/python/shm/
f = tempfile.TemporaryFile()
try:
f.write(dumps(self.copy(), -1))
f.flush()
self._shared = mmap.mmap(f.fileno(), f.tell())
finally:
f.close()
self.release()
def __del__(self):
try:
os.close(self._r)
os.close(self._w)
except TypeError: # if os.close is None
pass
def acquire(self):
self._acquired += 1
if not self._acquired:
os.read(self._r, 1)
try:
self.clear()
shared = self._shared
shared.resize(shared.size())
self.update(loads(shared[:]))
except:
self.release()
raise
def release(self, commit=False):
if not self._acquired:
if commit:
self.commit()
os.write(self._w, '\0')
self._acquired -= 1
def commit(self):
shared = self._shared
p = dumps(self.copy(), -1)
shared.resize(len(p))
shared[:] = p
cluster_dict = ClusterDict()
class ClusterPdb(object):
"""Multiprocess-aware wrapper around console and winpdb debuggers
__call__ is the method to break.
TODO: monkey-patch normal code not to timeout
if another node is being debugged
"""
def __init__(self):
self._count_dict = {}
def __setattr__(self, name, value):
try:
hook = getattr(self, name)
setattr(value.im_self, value.__name__, wraps(value)(
lambda *args, **kw: hook(value, *args, **kw)))
except AttributeError:
object.__setattr__(self, name, value)
@property
def broken_peer(self):
return self._getLastPdb(os.getpid()) is None
def __call__(self, max_count=None, depth=0, text=None):
depth += 1
if max_count:
frame = sys._getframe(depth)
key = id(frame.f_code), frame.f_lineno
del frame
self._count_dict[key] = count = 1 + self._count_dict.get(key, 0)
if max_count < count:
return
if not text:
try:
import rpdb2
except ImportError:
if text is not None:
raise
else:
if rpdb2.g_debugger is None:
rpdb2_CStateManager = rpdb2.CStateManager
def CStateManager(*args, **kw):
rpdb2.CStateManager = rpdb2_CStateManager
state_manager = rpdb2.CStateManager(*args, **kw)
self._rpdb2_set_state = state_manager.set_state
return state_manager
rpdb2.CStateManager = CStateManager
return debug.winpdb(depth)
try:
debugger = self.__dict__['_debugger']
except KeyError:
assert 'rpdb2' not in sys.modules
self._debugger = debugger = debug.getPdb()
self._bdb_interaction = debugger.interaction
return debugger.set_trace(sys._getframe(depth))
def kill(self, pid, sig):
force = []
sigint_handler = None
try:
while 1:
cluster_dict.acquire()
try:
last_pdb = cluster_dict.get('last_pdb', {})
if force or pid not in last_pdb:
os.kill(pid, sig)
last_pdb.pop(pid, None)
cluster_dict.commit()
break
try:
if psutil.Process(pid).status == psutil.STATUS_ZOMBIE:
break
except psutil.NoSuchProcess:
raise OSError(errno.ESRCH, 'No such process')
finally:
cluster_dict.release()
if sigint_handler is None:
sigint_handler = signal.signal(signal.SIGINT,
lambda *args: force.append(None))
sys.stderr.write('Pid %u is/was debugged.'
' Press ^C to kill it...' % pid)
sleep(1)
finally:
if sigint_handler is not None:
signal.signal(signal.SIGINT, sigint_handler)
if force:
sys.stderr.write('\n')
def _lock_console(self):
while 1:
cluster_dict.acquire()
try:
if 'text_pdb' not in cluster_dict:
cluster_dict['text_pdb'] = pid = os.getpid()
cluster_dict.setdefault('last_pdb', {})[pid] = None
cluster_dict.commit()
break
finally:
cluster_dict.release()
sleep(0.5)
def _unlock_console(self):
cluster_dict.acquire()
try:
pid = cluster_dict.pop('text_pdb')
cluster_dict['last_pdb'][pid] = time()
cluster_dict.commit()
finally:
cluster_dict.release()
def _bdb_interaction(self, hooked, *args, **kw):
self._lock_console()
try:
return hooked(*args, **kw)
finally:
self._unlock_console()
def _rpdb2_set_state(self, hooked, state=None, *args, **kw):
from rpdb2 import STATE_BROKEN, STATE_DETACHED
cluster_dict.acquire()
try:
if state is None:
state = hooked.im_self.get_state()
last_pdb = cluster_dict.setdefault('last_pdb', {})
pid = os.getpid()
if state == STATE_DETACHED:
last_pdb.pop(pid, None)
else:
last_pdb[pid] = state != STATE_BROKEN and time() or None
return hooked(state=state, *args, **kw)
finally:
cluster_dict.release(True)
def _getLastPdb(self, *exclude):
result = 0
for pid, last_pdb in cluster_dict.get('last_pdb', {}).iteritems():
if pid not in exclude:
if last_pdb is None:
return
if result < last_pdb:
result = last_pdb
return result
def wait(self, test, timeout, period):
end_time = time() + timeout
while not test():
cluster_dict.acquire()
try:
last_pdb = self._getLastPdb()
if last_pdb is not None and \
time() > max(last_pdb + timeout, end_time):
return False
finally:
cluster_dict.release()
sleep(period)
return True
__builtin__.pdb = ClusterPdb()
signal.signal(signal.SIGUSR2, debug.decorate(lambda sig, frame: pdb(depth=2)))
......@@ -29,13 +29,15 @@ import unittest
import tempfile
import traceback
import threading
import psutil
import neo.scripts
from neo.neoctl.neoctl import NeoCTL, NotReadyException
from neo.lib.protocol import ClusterStates, NodeTypes, CellStates, NodeStates
from neo.lib.util import dump, SOCKET_CONNECTORS_DICT
from neo.tests import DB_ADMIN, DB_PASSWD, NeoTestBase, buildUrlFromString, \
ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, SocketLock, getTempDirectory
ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, getTempDirectory
from neo.tests.cluster import SocketLock
from neo.client.Storage import Storage
NEO_MASTER = 'neomaster'
......@@ -150,14 +152,10 @@ class NEOProcess(object):
def kill(self, sig=signal.SIGTERM):
if self.pid:
delay = pdb.acquire()
try:
try:
os.kill(self.pid, sig)
except OSError:
traceback.print_last()
finally:
pdb.release(delay)
pdb.kill(self.pid, sig)
except OSError:
traceback.print_last()
else:
raise AlreadyStopped
......@@ -202,15 +200,9 @@ class NEOProcess(object):
def isAlive(self):
try:
os.kill(self.pid, 0)
except OSError, (errno, msg):
if errno == 3: # No such process
result = False
else:
raise
else:
result = True
return result
return psutil.Process(self.pid).status != psutil.STATUS_ZOMBIE
except psutil.NoSuchProcess:
return False
class NEOCluster(object):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment