Commit fa77a579 authored by Julien Muchembled's avatar Julien Muchembled

qa: add testrunner options to dump/check the format of network packets

With the switch to msgpack, there was no schema anymore whereas it was
sometimes used for both automatic conversion (e.g. the last argument of
AskStoreTransaction must now be explicitly cast to list) and type checking.

This somewhat reintroduces a kind of schema that:
- is used by the test suite for type checking
- can be generated automatically from the test suite
  when one change the procotol
parent 50fb7793
...@@ -257,13 +257,11 @@ class Packet(object): ...@@ -257,13 +257,11 @@ class Packet(object):
_answer = None _answer = None
_code = None _code = None
_id = None _id = None
allow_dict = False
nodelay = True nodelay = True
poll_thread = False poll_thread = False
def __init__(self, *args): def __init__(self, *args):
assert self._code is not None, "Packet class not registered" assert self._code is not None, "Packet class not registered"
assert self.allow_dict or dict not in map(type, args), args
self._args = args self._args = args
def setId(self, value): def setId(self, value):
...@@ -465,7 +463,7 @@ class Packets(dict): ...@@ -465,7 +463,7 @@ class Packets(dict):
that haven't been unlocked. that haven't been unlocked.
:nodes: M -> S :nodes: M -> S
""", allow_dict=True) """)
AskFinalTID, AnswerFinalTID = request(""" AskFinalTID, AnswerFinalTID = request("""
Return final tid if ttid has been committed, to recover from certain Return final tid if ttid has been committed, to recover from certain
...@@ -680,7 +678,7 @@ class Packets(dict): ...@@ -680,7 +678,7 @@ class Packets(dict):
If current_serial's data is current on storage. If current_serial's data is current on storage.
:nodes: C -> S :nodes: C -> S
""", allow_dict=True) """)
AskTIDsFrom, AnswerTIDsFrom = request(""" AskTIDsFrom, AnswerTIDsFrom = request("""
Ask for length TIDs starting at min_tid. The order of TIDs is ascending. Ask for length TIDs starting at min_tid. The order of TIDs is ascending.
...@@ -701,7 +699,7 @@ class Packets(dict): ...@@ -701,7 +699,7 @@ class Packets(dict):
specified. specified.
:nodes: ctl -> A -> M :nodes: ctl -> A -> M
""", error=True, allow_dict=True) """, error=True)
CheckPartition = notify(""" CheckPartition = notify("""
Ask a storage node to compare a partition with all other nodes. Ask a storage node to compare a partition with all other nodes.
...@@ -769,7 +767,7 @@ class Packets(dict): ...@@ -769,7 +767,7 @@ class Packets(dict):
no new data up to 'tid' for the given partition no new data up to 'tid' for the given partition
:nodes: M -> S :nodes: M -> S
""", allow_dict=True) """)
NotifyReplicationDone = notify(""" NotifyReplicationDone = notify("""
Notify the master node that a partition has been successfully Notify the master node that a partition has been successfully
...@@ -790,7 +788,7 @@ class Packets(dict): ...@@ -790,7 +788,7 @@ class Packets(dict):
and reply with the list of records we should not have. and reply with the list of records we should not have.
:nodes: S -> S :nodes: S -> S
""", allow_dict=True) """)
AddTransaction = notify(""" AddTransaction = notify("""
Send metadata of a transaction to a node that does not have them. Send metadata of a transaction to a node that does not have them.
......
...@@ -298,6 +298,13 @@ class TestRunner(BenchmarkRunner): ...@@ -298,6 +298,13 @@ class TestRunner(BenchmarkRunner):
x('-S', '--stop-on-success', action='store_true', default=None, x('-S', '--stop-on-success', action='store_true', default=None,
help='Opposite of --stop-on-error: stop as soon as a test' help='Opposite of --stop-on-error: stop as soon as a test'
' passes. Details about errors are not printed at exit.') ' passes. Details about errors are not printed at exit.')
x = parser.add_mutually_exclusive_group().add_argument
x('-p', '--dump-protocol', const=True,
dest='protocol', action='store_const',
help='Dump schema of protocol instead of checking it.')
x('-P', '--no-check-protocol', const=False,
dest='protocol', action='store_const',
help='Do not check schema of protocol.')
_('-r', '--readable-tid', action='store_true', _('-r', '--readable-tid', action='store_true',
help='Change master behaviour to generate readable TIDs for easier' help='Change master behaviour to generate readable TIDs for easier'
' debugging (rather than from current time).') ' debugging (rather than from current time).')
...@@ -347,6 +354,7 @@ Environment Variables: ...@@ -347,6 +354,7 @@ Environment Variables:
coverage = args.coverage, coverage = args.coverage,
cov_unit = args.cov_unit, cov_unit = args.cov_unit,
only = args.only, only = args.only,
protocol = args.protocol,
stop_on_success = args.stop_on_success, stop_on_success = args.stop_on_success,
readable_tid = args.readable_tid, readable_tid = args.readable_tid,
) )
...@@ -374,19 +382,26 @@ Environment Variables: ...@@ -374,19 +382,26 @@ Environment Variables:
self.__coverage.save() self.__coverage.save()
del self.__coverage del self.__coverage
orig(self, success) orig(self, success)
try: if config.protocol is False:
for _ in xrange(config.loop): from contextlib import nested
if config.unit: protocol_checker = nested()
runner.run('Unit tests', UNIT_TEST_MODULES, only) else:
if config.functional: from neo.tests.protocol_checker import protocolChecker
runner.run('Functional tests', FUNC_TEST_MODULES, only) protocol_checker = protocolChecker(config.protocol)
if config.zodb: with protocol_checker:
runner.run('ZODB tests', ZODB_TEST_MODULES, only) try:
except KeyboardInterrupt: for _ in xrange(config.loop):
config['mail_to'] = None if config.unit:
traceback.print_exc() runner.run('Unit tests', UNIT_TEST_MODULES, only)
except StopOnSuccess: if config.functional:
pass runner.run('Functional tests', FUNC_TEST_MODULES, only)
if config.zodb:
runner.run('ZODB tests', ZODB_TEST_MODULES, only)
except KeyboardInterrupt:
config['mail_to'] = None
traceback.print_exc()
except StopOnSuccess:
pass
if config.coverage: if config.coverage:
coverage.stop() coverage.stop()
if coverage.neotestrunner: if coverage.neotestrunner:
......
# generated by running the whole test suite with -p
AbortTransaction(p64,[int])
AcceptIdentification(NodeTypes,?int,int,int,?int)
AddObject(p64,p64,int,bin,bin,?p64)
AddPendingNodes([int])
AddTransaction(p64,bin,bin,bin,int,p64,[p64])
AnswerBeginTransaction(p64)
AnswerCheckCurrentSerial(?p64)
AnswerCheckSerialRange(int,bin,p64,bin,p64)
AnswerCheckTIDRange(int,bin,p64)
AnswerClusterState(ClusterStates)
AnswerFetchObjects(?,?p64,?p64,{:})
AnswerFetchTransactions(?,?p64,[])
AnswerFinalTID(p64)
AnswerInformationLocked(p64)
AnswerLastIDs(?p64,?p64)
AnswerLastTransaction(p64)
AnswerLockedTransactions({p64:?p64})
AnswerNewOIDs([p64])
AnswerNodeList([(NodeTypes,?(bin,int),?int,NodeStates,?float)])
AnswerObject(p64,p64,?p64,?int,bin,bin,?p64)
AnswerObjectHistory(p64,[(p64,int)])
AnswerObjectUndoSerial({p64:(p64,?p64,bool)})
AnswerPack(bool)
AnswerPartitionList(int,[(int,[(int,CellStates)])])
AnswerPartitionTable(?int,[(int,[(int,CellStates)])])
AnswerPrimary(int)
AnswerRebaseObject(?(p64,p64,?(int,bin,bin)))
AnswerRebaseTransaction([p64])
AnswerRecovery(?int,?p64,?p64)
AnswerStoreObject(?p64)
AnswerStoreTransaction()
AnswerTIDs([p64])
AnswerTIDsFrom([p64])
AnswerTransactionFinished(p64,p64)
AnswerTransactionInformation(p64,bin,bin,bin,int,[p64])
AnswerUnfinishedTransactions(p64,[p64])
AnswerVoteTransaction()
AskBeginTransaction(?p64)
AskCheckCurrentSerial(p64,p64,p64)
AskCheckSerialRange(int,int,p64,p64,p64)
AskCheckTIDRange(int,int,p64,p64)
AskClusterState()
AskFetchObjects(int,int,p64,p64,p64,{p64:[p64]})
AskFetchTransactions(int,int,p64,p64,[p64])
AskFinalTID(p64)
AskFinishTransaction(p64,[p64],[p64])
AskLastIDs()
AskLastTransaction()
AskLockInformation(p64,p64)
AskLockedTransactions()
AskNewOIDs(int)
AskNodeList(NodeTypes)
AskObject(p64,?p64,?p64)
AskObjectHistory(p64,int,int)
AskObjectUndoSerial(p64,p64,p64,[p64])
AskPack(p64)
AskPartitionList(int,int,?)
AskPartitionTable()
AskPrimary()
AskRebaseObject(p64,p64)
AskRebaseTransaction(p64,p64)
AskRecovery()
AskStoreObject(p64,p64,int,bin,bin,?p64,?p64)
AskStoreTransaction(p64,bin,bin,bin,[p64])
AskTIDs(int,int,int)
AskTIDsFrom(p64,p64,int,int)
AskTransactionInformation(p64)
AskUnfinishedTransactions([int])
AskVoteTransaction(p64)
CheckPartition(int,(bin,?(bin,int)),p64,p64)
CheckReplicas({int:?int},p64,?)
Error(int,bin)
FailedVote(p64,[int])
InvalidateObjects(p64,[p64])
NotPrimaryMaster(?int,[(bin,int)])
NotifyClusterInformation(ClusterStates)
NotifyDeadlock(p64,p64)
NotifyNodeInformation(float,[(NodeTypes,?(bin,int),?int,NodeStates,?float)])
NotifyPartitionChanges(int,[(int,int,CellStates)])
NotifyPartitionCorrupted(int,[int])
NotifyReady()
NotifyRepair(int)
NotifyReplicationDone(int,p64)
NotifyTransactionFinished(p64,p64)
NotifyUnlockInformation(p64)
Ping()
Pong()
Repair([int],int)
Replicate(p64,bin,{int:?(bin,int)})
RequestIdentification(NodeTypes,?int,?(bin,int),bin,any,?float)
SendPartitionTable(?int,[(int,[(int,CellStates)])])
SetClusterState(ClusterStates)
SetNodeState(int,NodeStates)
StartOperation(bool)
StopOperation()
Truncate(p64)
TweakPartitionTable([])
ValidateTransaction(p64,p64)
# The use of ast is convoluted, and the result quite verbose,
# but that remains simpler than writing a parser from scratch.
import ast, os
from contextlib import contextmanager
from neo.lib.protocol import Packet, Enum
array = list, set, tuple
item = Enum.Item
class _ast(object):
def __getattr__(self, k):
v = lambda *args: getattr(ast, k)(lineno=0, col_offset=0, *args)
setattr(self, k, v)
return v
_ast = _ast()
class parseArgument(ast.NodeTransformer):
def visit_UnaryOp(self, node):
assert isinstance(node.op, ast.USub)
return _ast.Call(_ast.Name('option', ast.Load()),
[self.visit(node.operand)], [], None, None)
def visit_Name(self, node):
return _ast.Str(node.id.replace('_', '?'))
parseArgument = parseArgument().visit
class Argument(object):
merge = True
type = ''
option = False
@classmethod
def load(cls, arg):
arg = ast.parse(arg.rstrip()
.replace('?(', '-(').replace('?[', '-[').replace('?{', '-{')
.replace('?', '_').replace('[]', '[""]')
.replace('{:', '{"":').replace(':}', ':""}'),
mode="eval")
x = arg.body
name = x.func.id
arg.body = parseArgument(_ast.Tuple(x.args, ast.Load()))
return name, cls._load(eval(compile(arg, '', mode="eval"),
{'option': cls._option}))
@classmethod
def _load(cls, arg):
t = type(arg)
if t is cls:
return arg
x = object.__new__(cls)
if t is tuple:
x.type = map(cls._load, arg)
elif t is list:
x.type = cls._load(*arg),
elif t is dict:
(k, v), = arg.iteritems()
x.type = cls._load(k), cls._load(v)
else:
if arg.startswith('?'):
arg = arg[1:]
x.option = True
x.type = arg
return x
@classmethod
def _option(cls, arg):
arg = cls._load(arg)
arg.option = True
return arg
@classmethod
def _merge(cls, args):
if args:
x, = {cls(x) for x in args}
return x
return object.__new__(cls)
def __init__(self, arg, root=False):
if arg is None:
self.option = True
elif isinstance(arg, tuple) and (root or len(arg) > 1):
self.type = map(self.__class__, arg)
elif isinstance(arg, array):
self.type = self._merge(arg),
elif isinstance(arg, dict):
self.type = self._merge(arg), self._merge(arg.values())
else:
self.type = (('p64' if len(arg) == 8 else
'bin') if isinstance(arg, bytes) else
arg._enum._name if isinstance(arg, item) else
'str' if isinstance(arg, unicode) else
type(arg).__name__)
def __repr__(self):
x = self.type
if type(x) is tuple:
x = ('[%s]' if len(x) == 1 else '{%s:%s}') % x
elif type(x) is list:
x = '(%s)' % ','.join(map(repr, x))
return '?' + x if self.option else x
def __hash__(self):
return 0
def __eq__(self, other):
x = self.type
y = other.type
if x and y and x != 'any':
# Since we don't know whether an array is fixed-size record of
# heterogeneous values or a collection of homogeneous values,
# we end up with the following complicated heuristic.
t = type(x)
if t is tuple:
if len(x) == 1 and type(y) is list:
z = set(x)
z.update(y)
if len(z) == 1:
x = y = tuple(z)
if self.merge:
self.type = x
elif t is list:
if type(y) is tuple and len(y) == 1:
z = set(y)
z.update(x)
if len(z) == 1:
x = y = tuple(z)
if self.merge:
self.type = x
t = tuple
elif t is str is type(y) and {x, y}.issuperset(('bin', 'p64')):
x = y = 'bin'
if self.merge:
self.type = x
if not (t is type(y) and (t is not tuple or
len(x) == len(y)) and x == y):
if not self.merge:
return False
self.type = 'any'
if self.merge:
if not x:
self.type = y
if not self.option:
self.option = other.option
elif y and not x or other.option and not self.option:
return False
return True
class FrozenArgument(Argument):
merge = False
@contextmanager
def protocolChecker(dump):
x = 'Packet(p64,?[(bin,{int:})],{:?(?,[])},?{?:float})'
assert x == '%s%r' % Argument.load(x)
assert not (FrozenArgument([]) == Argument([0]))
path = os.path.join(os.path.dirname(__file__), 'protocol')
if dump:
import threading
from multiprocessing import Lock
lock = Lock()
schema = {}
pid = os.getpid()
r, w = os.pipe()
def _check(name, arg):
try:
schema[name] == arg
except KeyError:
schema[name] = arg
def check(name, args):
arg = Argument(args, True)
if pid == os.getpid():
_check(name, arg)
else:
with lock:
os.write(w, '%s%r\n' % (name, arg))
def check_thread(r):
for x in os.fdopen(r):
_check(*Argument.load(x))
check_thread = threading.Thread(target=check_thread, args=(r,))
check_thread.daemon = True
check_thread.start()
else:
with open(path) as p:
x = p.readline()
assert x[0] == '#', x
schema = dict(map(FrozenArgument.load, p))
def check(name, args):
arg = Argument(args, True)
if not (None is not schema.get(name) == arg):
raise Exception('invalid packet: %s%r' % (name, arg))
w = None
Packet_encode = Packet.__dict__['encode']
def encode(packet):
check(type(packet).__name__, packet._args)
return Packet_encode(packet)
Packet.encode = encode
try:
yield
finally:
Packet.encode = Packet_encode
if w:
os.close(w)
check_thread.join()
if dump:
with open(path, 'w') as p:
p.write('# generated by running the whole test suite with -p\n')
for x in sorted(schema.iteritems()):
p.write('%s%r\n' % x)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment