Commit 71843e90 authored by Kirill Smelkov's avatar Kirill Smelkov

Merge branch 'master' into t

* master:
  storage: fix crash when a client loses connection to the master just before voting
  no change: only some code reindentation
  Enable branch coverage measurement by default
  coverage: add support for functional tests
parents 5f5d6516 15472c62
No related merge requests found
[run]
branch = True
source = neo
omit =
neo/debug.py
......
......@@ -647,6 +647,10 @@ class Application(ThreadedApplication):
txn_context['voted'] = None
# We must not go further if connection to master was lost since
# tpc_begin, to lower the probability of failing during tpc_finish.
# IDEA: We can improve in 2 opposite directions:
# - In the case of big transactions, it would be useful to
# also detect failures earlier.
# - If possible, recover from master failure.
if 'error' in txn_context:
raise NEOStorageError(txn_context['error'])
return result
......
......@@ -93,51 +93,50 @@ class PrimaryNotificationsHandler(MTEventHandler):
app.pt = PartitionTable(num_partitions, num_replicas)
def answerLastTransaction(self, conn, ltid):
app = self.app
if app.last_tid != ltid:
# Either we're connecting or we already know the last tid
# via invalidations.
assert app.master_conn is None, app.master_conn
if 1:
app._cache_lock_acquire()
try:
if app.last_tid < ltid:
app._cache.clear_current()
# In the past, we tried not to invalidate the
# Connection caches entirely, using the list of
# oids that are invalidated by clear_current.
# This was wrong because these caches may have
# entries that are not in the NEO cache anymore.
else:
# The DB was truncated. It happens so
# rarely that we don't need to optimize.
app._cache.clear()
# Make sure a parallel load won't refill the cache
# with garbage.
app._loading_oid = app._loading_invalidated = None
finally:
app._cache_lock_release()
db = app.getDB()
db is None or db.invalidateCache()
app.last_tid = ltid
def answerTransactionFinished(self, conn, _, tid, callback, cache_dict):
app = self.app
app.last_tid = tid
# Update cache
cache = app._cache
app = self.app
if app.last_tid != ltid:
# Either we're connecting or we already know the last tid
# via invalidations.
assert app.master_conn is None, app.master_conn
app._cache_lock_acquire()
try:
for oid, data in cache_dict.iteritems():
# Update ex-latest value in cache
cache.invalidate(oid, tid)
if data is not None:
# Store in cache with no next_tid
cache.store(oid, data, tid, None)
if callback is not None:
callback(tid)
if app.last_tid < ltid:
app._cache.clear_current()
# In the past, we tried not to invalidate the
# Connection caches entirely, using the list of
# oids that are invalidated by clear_current.
# This was wrong because these caches may have
# entries that are not in the NEO cache anymore.
else:
# The DB was truncated. It happens so
# rarely that we don't need to optimize.
app._cache.clear()
# Make sure a parallel load won't refill the cache
# with garbage.
app._loading_oid = app._loading_invalidated = None
finally:
app._cache_lock_release()
db = app.getDB()
db is None or db.invalidateCache()
app.last_tid = ltid
def answerTransactionFinished(self, conn, _, tid, callback, cache_dict):
app = self.app
app.last_tid = tid
# Update cache
cache = app._cache
app._cache_lock_acquire()
try:
for oid, data in cache_dict.iteritems():
# Update ex-latest value in cache
cache.invalidate(oid, tid)
if data is not None:
# Store in cache with no next_tid
cache.store(oid, data, tid, None)
if callback is not None:
callback(tid)
finally:
app._cache_lock_release()
def connectionClosed(self, conn):
app = self.app
......
......@@ -30,6 +30,7 @@ if filter(re.compile(r'--coverage$|-\w*c').match, sys.argv[1:]):
# Start coverage as soon as possible.
import coverage
coverage = coverage.Coverage()
coverage.neotestrunner = []
coverage.start()
import neo
......@@ -212,7 +213,7 @@ class TestRunner(BenchmarkRunner):
def add_options(self, parser):
parser.add_option('-c', '--coverage', action='store_true',
help='Enable coverage (not working yet for functional tests)')
help='Enable coverage')
parser.add_option('-f', '--functional', action='store_true',
help='Functional tests')
parser.add_option('-u', '--unit', action='store_true',
......@@ -269,6 +270,8 @@ Environment Variables:
traceback.print_exc()
if config.coverage:
coverage.stop()
if coverage.neotestrunner:
coverage.combine(coverage.neotestrunner)
coverage.save()
# build report
self._successful = runner.wasSuccessful()
......
......@@ -162,7 +162,10 @@ class TransactionManager(object):
Store transaction information received from client node
"""
logging.debug('Vote TXN %s', dump(ttid))
transaction = self._transaction_dict[ttid]
try:
transaction = self._transaction_dict[ttid]
except KeyError:
raise ProtocolError("unknown ttid %s" % dump(ttid))
object_list = transaction.getObjectList()
if txn_info:
user, desc, ext, oid_list = txn_info
......@@ -195,7 +198,10 @@ class TransactionManager(object):
"""
Unlock transaction
"""
tid = self._transaction_dict[ttid].getTID()
try:
tid = self._transaction_dict[ttid].getTID()
except KeyError:
raise ProtocolError("unknown ttid %s" % dump(ttid))
logging.debug('Unlock TXN %s (ttid=%s)', dump(tid), dump(ttid))
dm = self._app.dm
dm.unlockTransaction(tid, ttid)
......
......@@ -42,6 +42,11 @@ from .. import ADDRESS_TYPE, DB_SOCKET, DB_USER, IP_VERSION_FORMAT_DICT, SSL, \
from neo.client.Storage import Storage
from neo.storage.database import buildDatabaseManager
try:
coverage = sys.modules['neo.scripts.runner'].coverage
except (AttributeError, KeyError):
coverage = None
command_dict = {
NodeTypes.MASTER: 'neomaster',
NodeTypes.STORAGE: 'neostorage',
......@@ -111,6 +116,10 @@ class PortAllocator(object):
class NEOProcess(object):
_coverage_fd = None
_coverage_prefix = os.path.join(getTempDirectory(), 'coverage-')
_coverage_index = 0
pid = 0
def __init__(self, command, uuid, arg_dict):
......@@ -136,12 +145,40 @@ class NEOProcess(object):
args.append(str(param))
if with_uuid:
args += '--uuid', str(self.uuid)
global coverage
if coverage:
cls = self.__class__
cls._coverage_index += 1
coverage_data_path = cls._coverage_prefix + str(cls._coverage_index)
self._coverage_fd, w = os.pipe()
def save_coverage(*args):
if coverage:
coverage.stop()
coverage.save()
if args:
os.close(w)
os.kill(os.getpid(), signal.SIGSTOP)
self.pid = os.fork()
if self.pid == 0:
if self.pid:
# Wait that the signal to kill the child is set up.
os.close(w)
os.read(self._coverage_fd, 1)
if coverage:
coverage.neotestrunner.append(coverage_data_path)
else:
# Child
try:
# release SQLite debug log
logging.setup()
signal.signal(signal.SIGTERM, lambda *args: sys.exit())
if coverage:
coverage.stop()
from coverage import Coverage
coverage = Coverage(coverage_data_path)
coverage.start()
signal.signal(signal.SIGUSR2, save_coverage)
os.close(self._coverage_fd)
os.write(w, '\0')
sys.argv = [command] + args
getattr(neo.scripts, command).main()
status = 0
......@@ -158,6 +195,7 @@ class NEOProcess(object):
# prevent child from killing anything (cf __del__), or
# running any other cleanup code normally done by the parent
try:
save_coverage()
os._exit(status)
except:
print >>sys.stderr, status
......@@ -166,6 +204,15 @@ class NEOProcess(object):
logging.info('pid %u: %s %s',
self.pid, command, ' '.join(map(repr, args)))
def child_coverage(self):
r = self._coverage_fd
if r is not None:
try:
os.read(r, 1)
finally:
os.close(r)
del self._coverage_fd
def kill(self, sig=signal.SIGTERM):
if self.pid:
logging.info('kill pid %u', self.pid)
......@@ -186,12 +233,14 @@ class NEOProcess(object):
# guaranteed way to handle them (other objects we would depend on
# might already have been deleted).
pass
assert self._coverage_fd is None, self._coverage_fd
def wait(self, options=0):
def wait(self):
if self.pid == 0:
raise AlreadyStopped
result = os.WEXITSTATUS(os.waitpid(self.pid, options)[1])
result = os.WEXITSTATUS(os.waitpid(self.pid, 0)[1])
self.pid = 0
self.child_coverage()
if result:
raise NodeProcessError('%r %r exited with status %r' % (
self.command, self.arg_dict, result))
......@@ -384,10 +433,12 @@ class NEOCluster(object):
for process_list in self.process_dict.itervalues():
for process in process_list:
try:
process.kill(signal.SIGSTOP)
process.kill(signal.SIGUSR2)
stopped_list.append(process)
except AlreadyStopped:
pass
for process in stopped_list:
process.child_coverage()
error_list = []
for process in stopped_list:
try:
......
......@@ -1135,6 +1135,25 @@ class Test(NEOThreadedTest):
finally:
cluster.stop()
def testMasterFailureBeforeVote(self):
def waitStoreResponses(orig, *args):
result = orig(*args)
m2c, = cluster.master.getConnectionList(orig.__self__)
m2c.close()
self.tic()
return result
cluster = NEOCluster(storage_count=2, partitions=2)
try:
cluster.start()
t, c = cluster.getTransaction()
c.root()['x'] = PCounter() # 1 store() to each storage
with Patch(cluster.client, waitStoreResponses=waitStoreResponses):
self.assertRaises(POSException.StorageError, t.commit)
self.assertEqual(cluster.neoctl.getClusterState(),
ClusterStates.RUNNING)
finally:
cluster.stop()
def testEmptyTransaction(self):
cluster = NEOCluster()
try:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment