Commit 5467a295 authored by Kirill Smelkov's avatar Kirill Smelkov

Merge branch 'master' into t

* master:
  Enable coverage for neo.tests, which is useful to find dead code
  Remove unused imports, found by pylint
  TODO: tweak should be safer
  Various neoctl/neolog formatting improvements/fixes
  debug: extend 'pdb' example to optionally break on an arbitrary list of callables
  client: fix simultaneous (re)connections to the master
parents ef5e0a40 ed693968
......@@ -4,4 +4,8 @@ source = neo
omit =
neo/debug.py
neo/scripts/runner.py
neo/tests/*
[report]
exclude_lines =
pragma: no cover
if __name__ == .__main__.:
......@@ -22,8 +22,7 @@ from .handler import AdminEventHandler, MasterEventHandler, \
MasterRequestEventHandler
from neo.lib.bootstrap import BootstrapManager
from neo.lib.pt import PartitionTable
from neo.lib.protocol import ClusterStates, Errors, \
NodeTypes, NodeStates, Packets
from neo.lib.protocol import ClusterStates, Errors, NodeTypes, Packets
from neo.lib.debug import register as registerLiveDebugger
class Application(BaseApplication):
......
......@@ -18,7 +18,6 @@ from ZODB import BaseStorage, ConflictResolution, POSException
from zope.interface import implementer
import ZODB.interfaces
from functools import wraps
from neo.lib import logging
from .app import Application
from .exception import NEOStorageNotFoundError, NEOStorageDoesNotExistError
......
......@@ -19,7 +19,6 @@ from zlib import compress, decompress
from random import shuffle
import heapq
import time
import weakref
from functools import partial
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
......@@ -32,20 +31,17 @@ from persistent.TimeStamp import TimeStamp
from neo.lib import logging
from neo.lib.protocol import NodeTypes, Packets, \
INVALID_PARTITION, MAX_TID, ZERO_HASH, ZERO_TID
from neo.lib.event import EventManager
from neo.lib.util import makeChecksum, dump
from neo.lib.locking import Empty, Lock, SimpleQueue
from neo.lib.connection import MTClientConnection, ConnectionClosed
from neo.lib.node import NodeManager
from .exception import NEOStorageError, NEOStorageCreationUndoneError
from .exception import NEOStorageNotFoundError
from .handlers import storage, master
from neo.lib.dispatcher import Dispatcher, ForgottenPacket
from neo.lib.dispatcher import ForgottenPacket
from neo.lib.threaded_app import ThreadedApplication
from .cache import ClientCache
from .pool import ConnectionPool
from neo.lib.util import p64, u64, parseMasterList
from neo.lib.debug import register as registerLiveDebugger
CHECKED_SERIAL = object()
......@@ -138,11 +134,11 @@ class Application(ThreadedApplication):
def __getattr__(self, attr):
if attr in ('last_tid', 'pt'):
if self._connecting_to_master_node.locked():
if attr == 'last_tid':
return
else:
self._getMasterConnection()
self._getMasterConnection()
# XXX: There's still a risk that we get disconnected from the
# master at this precise moment and for 'pt', we'd raise
# AttributeError. Should we catch it and loop until it
# succeeds?
return self.__getattribute__(attr)
def log(self):
......
......@@ -17,8 +17,8 @@
from neo.lib import logging
from neo.lib.handler import MTEventHandler
from neo.lib.pt import MTPartitionTable as PartitionTable
from neo.lib.protocol import NodeStates, Packets, ProtocolError
from neo.lib.util import dump, add64
from neo.lib.protocol import NodeStates, ProtocolError
from neo.lib.util import dump
from . import AnswerBaseHandler
from ..exception import NEOStorageError
......@@ -94,13 +94,14 @@ class PrimaryNotificationsHandler(MTEventHandler):
def answerLastTransaction(self, conn, ltid):
app = self.app
if app.last_tid != ltid:
app_last_tid = app.__dict__.get('last_tid', '')
if app_last_tid != ltid:
# Either we're connecting or we already know the last tid
# via invalidations.
assert app.master_conn is None, app.master_conn
app._cache_lock_acquire()
try:
if app.last_tid < ltid:
if app_last_tid < ltid:
app._cache.clear_current()
# In the past, we tried not to invalidate the
# Connection caches entirely, using the list of
......
......@@ -11,7 +11,15 @@ The prompt is accessible through network in case that the process is daemonized:
IF = 'pdb'
if IF == 'pdb':
import socket, sys, threading
# List of (module, callables) to break at.
# If empty, a thread is started with a breakpoint.
# All breakpoints are automatically removed on the first break,
# or when this module is reloaded.
BP = (#('ZODB.Connection', 'Connection.setstate'),
#('ZPublisher.Publish', 'publish_module_standard'),
)
import errno, socket, sys, threading, weakref
# Unfortunately, IPython does not always print to given stdout.
#from neo.lib.debug import getPdb
from pdb import Pdb as getPdb
......@@ -55,7 +63,7 @@ if IF == 'pdb':
self._socket.setblocking(1)
return False
def pdb(app_set):
def pdb():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# For better security, maybe we should use a unix socket.
......@@ -87,7 +95,53 @@ if IF == 'pdb':
app_set = ()
finally:
del f
threading.Thread(target=pdb, args=(app_set,)).start()
class setupBreakPoints(list):
def __init__(self, bp_list):
self._lock = threading.Lock()
for o, name in bp_list:
o = __import__(o, fromlist=1)
x = name.split('.')
name = x.pop()
for x in x:
o = getattr(o, x)
orig = getattr(o, name)
if orig.__module__ == __name__:
orig.__closure__[1].cell_contents._revert()
orig = getattr(o, name)
assert orig.__module__ != __name__, (o, name)
orig = getattr(orig, '__func__', orig)
self.append((o, name, orig))
setattr(o, name, self._wrap(orig))
print 'BP set on', orig
sys.stdout.flush()
self._hold = weakref.ref(pdb, self._revert)
def _revert(self, *_):
for x in self:
setattr(*x)
print 'BP removed on', x[2]
sys.stdout.flush()
del self[:]
def _wrap(self, orig):
return lambda *args, **kw: self(orig, *args, **kw)
def __call__(self, orig, *args, **kw):
stop = False
with self._lock:
if self:
stop = True
self._revert()
if stop:
pdb()
return orig(*args, **kw)
if BP:
setupBreakPoints(BP)
else:
threading.Thread(target=pdb).start()
elif IF == 'frames':
import sys, traceback
......
......@@ -236,7 +236,8 @@ class BaseConnection(object):
def _getReprInfo(self):
r = [
('uuid', uuid_str(self.getUUID())),
('address', '%s:%u' % self.addr if self.addr else '?'),
('address', ('[%s]:%s' if ':' in self.addr[0] else '%s:%s')
% self.addr if self.addr else '?'),
('handler', self.getHandler()),
]
connector = self.connector
......
......@@ -15,7 +15,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import inspect
from functools import wraps
def check_signature(reference, function):
# args, varargs, varkw, defaults
......
......@@ -221,8 +221,8 @@ class NEOLogger(Logger):
def _emit(self, r):
if type(r) is PacketRecord:
ip, port = r.addr
peer = '%s %s (%s:%u)' % ('>' if r.outgoing else '<',
uuid_str(r.uuid), ip, port)
peer = ('%s %s ([%s]:%s)' if ':' in ip else '%s %s (%s:%s)') % (
'>' if r.outgoing else '<', uuid_str(r.uuid), ip, port)
msg = r.msg
"""
pktcls = protocol.StaticRegistry[r.code]
......
......@@ -19,7 +19,8 @@ from os.path import exists, getsize
import json
from . import attributeTracker, logging
from .protocol import uuid_str, NodeTypes, NodeStates, ProtocolError
from .protocol import formatNodeList, uuid_str, \
NodeTypes, NodeStates, ProtocolError
class Node(object):
......@@ -161,10 +162,12 @@ class Node(object):
return self._identified
def __repr__(self):
return '<%s(uuid=%s, address=%s, state=%s, connection=%r%s) at %x>' % (
addr = self._address
return '<%s(uuid=%s%s, state=%s, connection=%r%s) at %x>' % (
self.__class__.__name__,
uuid_str(self._uuid),
self._address,
', address=' + ('[%s]:%s' if ':' in addr[0] else '%s:%s') % addr
if addr else '',
self._state,
self._connection,
'' if self._identified else ', not identified',
......@@ -448,16 +451,8 @@ class NodeManager(object):
def log(self):
logging.info('Node manager : %u nodes', len(self._node_set))
if self._node_set:
node_list = [(node, uuid_str(node.getUUID()))
for node in sorted(self._node_set)]
max_len = max(len(x[1]) for x in node_list)
for node, uuid in node_list:
address = node.getAddress() or ''
if address:
address = '%s:%d' % address
logging.info(' * %*s | %8s | %22s | %s',
max_len, uuid, node.getType(), address, node.getState())
logging.info('\n'.join(formatNodeList(
map(Node.asTuple, self._node_set), ' * ')))
@apply
def NODE_TYPE_MAPPING():
......
......@@ -1743,3 +1743,22 @@ def Errors():
registry_dict)(handler_method_name_dict)
Errors = Errors()
# Common helpers between the 'neo' module and 'neolog'.
from datetime import datetime
from operator import itemgetter
def formatNodeList(node_list, prefix='', _sort_key=itemgetter(2)):
if node_list:
node_list.sort(key=_sort_key)
node_list = [(
uuid_str(uuid), str(node_type),
('[%s]:%s' if ':' in addr[0] else '%s:%s')
% addr if addr else '', str(state),
str(id_timestamp and datetime.utcfromtimestamp(id_timestamp)))
for node_type, addr, uuid, state, id_timestamp in node_list]
t = ''.join('%%-%us | ' % max(len(x[i]) for x in node_list)
for i in xrange(len(node_list[0]) - 1))
return map((prefix + t + '%s').__mod__, node_list)
return ()
......@@ -21,7 +21,6 @@ from .connection import ConnectionClosed
from .debug import register as registerLiveDebugger
from .dispatcher import Dispatcher, ForgottenPacket
from .locking import SimpleQueue
from .protocol import Packets
class app_set(weakref.WeakSet):
......
......@@ -23,8 +23,8 @@ from neo.lib.bootstrap import BootstrapManager
from neo.lib.exception import PrimaryFailure
from neo.lib.handler import EventHandler
from neo.lib.node import NodeManager
from neo.lib.protocol import CellStates, ClusterStates, \
NodeStates, NodeTypes, Packets, uuid_str, INVALID_TID, ZERO_TID
from neo.lib.protocol import ClusterStates, \
NodeTypes, Packets, uuid_str, ZERO_TID
from neo.lib.util import add64, dump
from .app import StateChangedException
from .pt import PartitionTable
......
......@@ -16,7 +16,6 @@
from collections import defaultdict
from neo.lib import logging
from neo.lib.util import dump
from neo.lib.protocol import ClusterStates, Packets, NodeStates
from .handlers import BaseServiceHandler
......
......@@ -14,11 +14,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from operator import itemgetter
from .neoctl import NeoCTL, NotReadyException
from neo.lib.util import p64, u64, tidFromTime, timeStringFromTID
from neo.lib.protocol import uuid_str, ClusterStates, NodeTypes, \
UUID_NAMESPACES, ZERO_TID
from neo.lib.protocol import uuid_str, formatNodeList, \
ClusterStates, NodeTypes, UUID_NAMESPACES, ZERO_TID
action_dict = {
'print': {
......@@ -71,15 +70,6 @@ class TerminalNeoCTL(object):
for (uuid, state) in cell_list))
for (offset, cell_list) in row_list)
def formatNodeList(self, node_list, _sort_key=itemgetter(2, 0, 1)):
if not node_list:
return 'Empty list!'
node_list.sort(key=_sort_key)
return '\n'.join(
'%s - %s - %s - %s' % (node_type, uuid_str(uuid),
address and '%s:%s' % address, state)
for node_type, address, uuid, state in node_list)
# Actual actions
def getLastIds(self, params):
"""
......@@ -94,7 +84,7 @@ class TerminalNeoCTL(object):
else:
loid, ltid = self.neoctl.getLastIds()
r = "last_oid = 0x%x" % (u64(loid))
return r + "\nlast_tid = 0x%x (%s)\nlast_ptid = %u" % \
return r + "\nlast_tid = 0x%x (%s)\nlast_ptid = %s" % \
(u64(ltid), timeStringFromTID(ltid), ptid)
def getPartitionRowList(self, params):
......@@ -129,7 +119,7 @@ class TerminalNeoCTL(object):
else:
node_type = None
node_list = self.neoctl.getNodeList(node_type=node_type)
return self.formatNodeList(node_list)
return '\n'.join(formatNodeList(node_list)) or 'Empty list!'
def getClusterState(self, params):
"""
......
......@@ -20,6 +20,7 @@
import bz2, gzip, errno, optparse, os, signal, sqlite3, sys, time
from bisect import insort
from logging import getLevelName
from functools import partial
comp_dict = dict(bz2=bz2.BZ2File, gz=gzip.GzipFile)
......@@ -93,6 +94,11 @@ class Log(object):
exec bz2.decompress(text) in g
for x in 'uuid_str', 'Packets', 'PacketMalformedError':
setattr(self, x, g[x])
try:
self.notifyNodeInformation = partial(g['formatNodeList'],
prefix=' ! ')
except KeyError:
self.notifyNodeInformation = None
try:
self._next_protocol, = q("SELECT date FROM protocol WHERE date>?",
(date,)).next()
......@@ -144,18 +150,6 @@ class Log(object):
def error(self, code, message):
return "%s (%s)" % (code, message),
def notifyNodeInformation(self, node_list):
node_list.sort(key=lambda x: x[2])
node_list = [(self.uuid_str(x[2]), str(x[0]),
'%s:%u' % x[1] if x[1] else '?', str(x[3]))
+ ((repr(x[4]),) if len(x) > 4 else ()) # BBB
for x in node_list]
if node_list:
t = ''.join(' %%%us |' % max(len(x[i]) for x in node_list)
for i in xrange(len(node_list[0]) - 1))
return map((' !' + t + ' %s').__mod__, node_list)
return ()
def emit_many(log_list):
log_list = [(log, iter(log).next) for log in log_list]
......
......@@ -17,7 +17,6 @@
import traceback
import unittest
import logging
import time
import sys
import os
......@@ -33,7 +32,6 @@ if filter(re.compile(r'--coverage$|-\w*c').match, sys.argv[1:]):
coverage.neotestrunner = []
coverage.start()
import neo
from neo.tests import getTempDirectory, __dict__ as neo_tests__dict__
from neo.tests.benchmark import BenchmarkRunner
......
......@@ -15,7 +15,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import inspect, random, signal, sys
import inspect, random
from logging import getLogger, INFO, DEBUG
from optparse import OptionParser
from neo.lib import logging
......
......@@ -21,7 +21,6 @@ from neo.lib import logging
from neo.lib.app import BaseApplication
from neo.lib.protocol import uuid_str, \
CellStates, ClusterStates, NodeTypes, Packets
from neo.lib.node import NodeManager
from neo.lib.connection import ListeningConnection
from neo.lib.exception import StoppedOperation, PrimaryFailure
from neo.lib.pt import PartitionTable
......
......@@ -28,8 +28,7 @@ from .manager import DatabaseManager
from neo.lib import logging, patch, util
from neo.lib.exception import DatabaseFailure
from neo.lib.interfaces import implements
from neo.lib.protocol import BackendNotImplemented, CellStates, \
MAX_TID, ZERO_HASH, ZERO_OID, ZERO_TID
from neo.lib.protocol import BackendNotImplemented, MAX_TID
patch.speedupFileStorageTxnLookup()
......
......@@ -29,7 +29,7 @@ import struct
import time
from . import LOG_QUERIES
from .manager import CreationUndone, DatabaseManager, splitOIDField
from .manager import DatabaseManager, splitOIDField
from neo.lib import logging, util
from neo.lib.exception import DatabaseFailure
from neo.lib.interfaces import implements
......
......@@ -21,9 +21,8 @@ import string
import traceback
from . import LOG_QUERIES
from .manager import CreationUndone, DatabaseManager, splitOIDField
from .manager import DatabaseManager, splitOIDField
from neo.lib import logging, util
from neo.lib.exception import DatabaseFailure
from neo.lib.interfaces import implements
from neo.lib.protocol import CellStates, ZERO_OID, ZERO_TID, ZERO_HASH
......
......@@ -16,7 +16,7 @@
from neo.lib import logging
from neo.lib.handler import EventHandler
from neo.lib.protocol import uuid_str, NodeTypes, NotReadyError, Packets
from neo.lib.protocol import NodeTypes, NotReadyError, Packets
from neo.lib.protocol import ProtocolError, BrokenNodeDisallowedError
from .storage import StorageOperationHandler
from .client import ClientOperationHandler, ClientReadOnlyOperationHandler
......
......@@ -16,7 +16,7 @@
from neo.lib import logging
from neo.lib.util import dump
from neo.lib.protocol import Packets, ProtocolError, ZERO_TID
from neo.lib.protocol import Packets, ZERO_TID
from . import BaseMasterHandler
......
......@@ -19,7 +19,7 @@ import unittest
from cPickle import dumps
from mock import Mock, ReturnValues
from ZODB.POSException import StorageTransactionError, UndoError, ConflictError
from .. import NeoUnitTestBase, buildUrlFromString, ADDRESS_TYPE
from .. import NeoUnitTestBase, buildUrlFromString
from neo.client.app import Application
from neo.client.cache import test as testCache
from neo.client.exception import NEOStorageError, NEOStorageNotFoundError
......
......@@ -18,9 +18,6 @@ import unittest
from mock import Mock
from .. import NeoUnitTestBase
from neo.lib.node import NodeManager
from neo.lib.pt import PartitionTable
from neo.lib.protocol import NodeTypes
from neo.client.handlers.master import PrimaryBootstrapHandler
from neo.client.handlers.master import PrimaryNotificationsHandler, \
PrimaryAnswersHandler
from neo.client.exception import NEOStorageError
......
......@@ -16,7 +16,6 @@
import os, stat, time
from persistent import Persistent
from persistent.TimeStamp import TimeStamp
from BTrees.OOBTree import OOBTree
class Inode(OOBTree):
......
......@@ -26,7 +26,6 @@ from ZODB.FileStorage import FileStorage
from ZODB.POSException import ConflictError
from ZODB.tests.StorageTestBase import zodb_pickle
from persistent import Persistent
from .. import expectedFailure
from . import NEOCluster, NEOFunctionalTest
TREE_SIZE = 6
......
......@@ -16,7 +16,6 @@
import unittest
from collections import defaultdict
from mock import Mock
from .. import NeoUnitTestBase
from neo.lib.protocol import NodeStates, CellStates
from neo.lib.pt import PartitionTableException
......
......@@ -22,7 +22,7 @@ from neo.storage.app import Application
from neo.storage.handlers.master import MasterOperationHandler
from neo.lib.exception import PrimaryFailure
from neo.lib.pt import PartitionTable
from neo.lib.protocol import CellStates, ProtocolError, Packets
from neo.lib.protocol import CellStates, Packets
class StorageMasterHandlerTests(NeoUnitTestBase):
......
......@@ -15,7 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
import MySQLdb
from MySQLdb import OperationalError
from mock import Mock
from neo.lib.exception import DatabaseFailure
from neo.lib.util import p64
......@@ -59,7 +59,6 @@ class StorageMySQLdbTests(StorageDBTests):
def test_query2(self):
# test the OperationalError exception
# fake object, raise exception during the first call
from MySQLdb import OperationalError
from MySQLdb.constants.CR import SERVER_GONE_ERROR
class FakeConn(object):
def query(*args):
......@@ -78,7 +77,6 @@ class StorageMySQLdbTests(StorageDBTests):
def test_query3(self):
# OperationalError > raise DatabaseFailure exception
from MySQLdb import OperationalError
class FakeConn(object):
def close(self):
pass
......
......@@ -16,9 +16,8 @@
import unittest
from mock import Mock
from neo.lib import protocol
from neo.lib.protocol import NodeTypes, NodeStates
from neo.lib.node import Node, NodeManager, MasterDB
from neo.lib.node import Node, MasterDB
from . import NeoUnitTestBase, getTempDirectory
from time import time
from os import chmod, mkdir, rmdir, unlink
......
......@@ -15,7 +15,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from mock import Mock
from neo.lib.protocol import NodeStates, CellStates
from neo.lib.pt import Cell, PartitionTable, PartitionTableException
from . import NeoUnitTestBase
......
......@@ -18,7 +18,7 @@ from collections import deque
from cPickle import Pickler, Unpickler
from cStringIO import StringIO
from itertools import islice, izip_longest
import os, time, unittest
import os, unittest
import neo, transaction, ZODB
from neo.lib import logging
from neo.lib.util import u64
......
......@@ -17,6 +17,7 @@
from logging import getLogger, INFO, DEBUG
import random
import sys
import time
import transaction
from ZODB.POSException import ReadOnlyError, POSKeyError
......@@ -33,7 +34,7 @@ from neo.lib.event import EventManager
from neo.lib.protocol import CellStates, ClusterStates, Packets, \
ZERO_OID, ZERO_TID, MAX_TID, uuid_str
from neo.lib.util import p64
from .. import Patch
from .. import expectedFailure, Patch
from . import ConnectionFilter, NEOCluster, NEOThreadedTest, predictable_random
# dump log to stderr
......@@ -367,6 +368,33 @@ class ReplicationTests(NEOThreadedTest):
# TODO check tids
self.assertEqual(1, self.checkBackup(backup))
def testSafeTweak(self):
"""
Check that tweak always tries to keep a minimum of (replicas + 1)
readable cells, otherwise we have less/no redundancy as long as
replication has not finished.
"""
def changePartitionTable(orig, *args):
orig(*args)
sys.exit()
cluster = NEOCluster(partitions=3, replicas=1, storage_count=3)
s0, s1, s2 = cluster.storage_list
try:
cluster.start([s0, s1])
s2.start()
self.tic()
cluster.enableStorageList([s2])
# 2 UP_TO_DATE cells should become FEEDING,
# and be dropped only when the replication is done,
# so that 1 storage can still die without data loss.
with Patch(s0.dm, changePartitionTable=changePartitionTable):
cluster.neoctl.tweakPartitionTable()
self.tic()
expectedFailure(self.assertEqual)(cluster.neoctl.getClusterState(),
ClusterStates.RUNNING)
finally:
cluster.stop()
def testReplicationAbortedBySource(self):
"""
Check that a feeding node aborts replication when its partition is
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment