Commit a63b45fe authored by Julien Muchembled's avatar Julien Muchembled

mysql: fix remaining places where a server disconnection was not catched

parent fec86e26
...@@ -26,9 +26,6 @@ class PrimaryFailure(NeoException): ...@@ -26,9 +26,6 @@ class PrimaryFailure(NeoException):
class StoppedOperation(NeoException): class StoppedOperation(NeoException):
pass pass
class DatabaseFailure(NeoException):
pass
class NodeNotReady(NeoException): class NodeNotReady(NeoException):
pass pass
...@@ -22,14 +22,14 @@ def check_signature(reference, function): ...@@ -22,14 +22,14 @@ def check_signature(reference, function):
a, b, c, d = inspect.getargspec(function) a, b, c, d = inspect.getargspec(function)
x = len(A) - len(a) x = len(A) - len(a)
if x < 0: # ignore extra default parameters if x < 0: # ignore extra default parameters
if x + len(d) < 0: if B or x + len(d) < 0:
return False return False
del a[x:] del a[x:]
d = d[:x] or None d = d[:x] or None
elif x: # different signature elif x: # different signature
# We have no need yet to support methods with default parameters. # We have no need yet to support methods with default parameters.
return a == A[:-x] and (b or a and c) and not (d or D) return a == A[:-x] and (b or a and c) and not (d or D)
return a == A and b == B and c == C and d == D return a == A and (b or not B) and (c or not C) and d == D
def implements(obj, ignore=()): def implements(obj, ignore=()):
ignore = set(ignore) ignore = set(ignore)
......
...@@ -16,8 +16,6 @@ ...@@ -16,8 +16,6 @@
LOG_QUERIES = False LOG_QUERIES = False
from neo.lib.exception import DatabaseFailure
DATABASE_MANAGER_DICT = { DATABASE_MANAGER_DICT = {
'Importer': 'importer.ImporterDatabaseManager', 'Importer': 'importer.ImporterDatabaseManager',
'MySQL': 'mysqldb.MySQLDatabaseManager', 'MySQL': 'mysqldb.MySQLDatabaseManager',
...@@ -33,3 +31,6 @@ def getAdapterKlass(name): ...@@ -33,3 +31,6 @@ def getAdapterKlass(name):
def buildDatabaseManager(name, args=(), kw={}): def buildDatabaseManager(name, args=(), kw={}):
return getAdapterKlass(name)(*args, **kw) return getAdapterKlass(name)(*args, **kw)
class DatabaseFailure(Exception):
pass
...@@ -23,10 +23,9 @@ from ConfigParser import SafeConfigParser ...@@ -23,10 +23,9 @@ from ConfigParser import SafeConfigParser
from ZODB.config import storageFromString from ZODB.config import storageFromString
from ZODB.POSException import POSKeyError from ZODB.POSException import POSKeyError
from . import buildDatabaseManager from . import buildDatabaseManager, DatabaseFailure
from .manager import DatabaseManager from .manager import DatabaseManager
from neo.lib import compress, logging, patch, util from neo.lib import compress, logging, patch, util
from neo.lib.exception import DatabaseFailure
from neo.lib.interfaces import implements from neo.lib.interfaces import implements
from neo.lib.protocol import BackendNotImplemented, MAX_TID from neo.lib.protocol import BackendNotImplemented, MAX_TID
......
...@@ -19,9 +19,9 @@ from collections import defaultdict ...@@ -19,9 +19,9 @@ from collections import defaultdict
from contextlib import contextmanager from contextlib import contextmanager
from functools import wraps from functools import wraps
from neo.lib import logging, util from neo.lib import logging, util
from neo.lib.exception import DatabaseFailure
from neo.lib.interfaces import abstract, requires from neo.lib.interfaces import abstract, requires
from neo.lib.protocol import CellStates, NonReadableCell, ZERO_TID from neo.lib.protocol import CellStates, NonReadableCell, ZERO_TID
from . import DatabaseFailure
def lazymethod(func): def lazymethod(func):
def getter(self): def getter(self):
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
from binascii import a2b_hex from binascii import a2b_hex
from collections import OrderedDict from collections import OrderedDict
from functools import wraps
import MySQLdb import MySQLdb
from MySQLdb import DataError, IntegrityError, \ from MySQLdb import DataError, IntegrityError, \
OperationalError, ProgrammingError OperationalError, ProgrammingError
...@@ -33,18 +34,55 @@ import struct ...@@ -33,18 +34,55 @@ import struct
import sys import sys
import time import time
from . import LOG_QUERIES from . import LOG_QUERIES, DatabaseFailure
from .manager import DatabaseManager, splitOIDField from .manager import DatabaseManager, splitOIDField
from neo.lib import logging, util from neo.lib import logging, util
from neo.lib.exception import DatabaseFailure
from neo.lib.interfaces import implements from neo.lib.interfaces import implements
from neo.lib.protocol import CellStates, ZERO_OID, ZERO_TID, ZERO_HASH from neo.lib.protocol import CellStates, ZERO_OID, ZERO_TID, ZERO_HASH
class MysqlError(DatabaseFailure):
def __init__(self, exc, query=None):
self.exc = exc
self.query = query
code = property(lambda self: self.exc.args[0])
def __str__(self):
msg = 'MySQL error %s: %s' % self.exc.args
return msg if self.query is None else '%s\nQuery: %s' % (
msg, getPrintableQuery(self.query[:1000]))
def getPrintableQuery(query, max=70): def getPrintableQuery(query, max=70):
return ''.join(c if c in string.printable and c not in '\t\x0b\x0c\r' return ''.join(c if c in string.printable and c not in '\t\x0b\x0c\r'
else '\\x%02x' % ord(c) for c in query) else '\\x%02x' % ord(c) for c in query)
def auto_reconnect(wrapped):
def wrapper(self, *args):
# Try 3 times at most. When it fails too often for the same
# query then the disconnection is likely caused by this query.
# We don't want to enter into an infinite loop.
retry = 2
while 1:
try:
return wrapped(self, *args)
except OperationalError as m:
# IDEA: Is it safe to retry in case of DISK_FULL ?
# XXX: However, this would another case of failure that would
# be unnoticed by other nodes (ADMIN & MASTER). When
# there are replicas, it may be preferred to not retry.
if (self._active
or SERVER_GONE_ERROR != m.args[0] != SERVER_LOST
or not retry):
raise MysqlError(m, *args)
logging.info('the MySQL server is gone; reconnecting')
assert not self._deferred
self.close()
retry -= 1
return wraps(wrapped)(wrapper)
@implements @implements
class MySQLDatabaseManager(DatabaseManager): class MySQLDatabaseManager(DatabaseManager):
...@@ -65,9 +103,18 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -65,9 +103,18 @@ class MySQLDatabaseManager(DatabaseManager):
'(?:([^:]+)(?::(.*))?@)?([^~./]+)(.+)?$', database).groups() '(?:([^:]+)(?::(.*))?@)?([^~./]+)(.+)?$', database).groups()
def _close(self): def _close(self):
self.conn.close() try:
conn = self.__dict__.pop('conn')
except KeyError:
return
conn.close()
def __getattr__(self, attr):
if attr == 'conn':
self._tryConnect()
return DatabaseManager.__getattr__(self, attr)
def _connect(self): def _tryConnect(self):
kwd = {'db' : self.db, 'user' : self.user} kwd = {'db' : self.db, 'user' : self.user}
if self.passwd is not None: if self.passwd is not None:
kwd['passwd'] = self.passwd kwd['passwd'] = self.passwd
...@@ -118,43 +165,26 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -118,43 +165,26 @@ class MySQLDatabaseManager(DatabaseManager):
% (name, self._max_allowed_packet // 1024)) % (name, self._max_allowed_packet // 1024))
self._max_allowed_packet = int(value) self._max_allowed_packet = int(value)
_connect = auto_reconnect(_tryConnect)
def _commit(self): def _commit(self):
self.conn.commit() self.conn.commit()
self._active = 0 self._active = 0
@auto_reconnect
def query(self, query): def query(self, query):
"""Query data from a database.""" """Query data from a database."""
if LOG_QUERIES: if LOG_QUERIES:
logging.debug('querying %s...', logging.debug('querying %s...',
getPrintableQuery(query.split('\n', 1)[0][:70])) getPrintableQuery(query.split('\n', 1)[0][:70]))
# Try 3 times at most. When it fails too often for the same conn = self.conn
# query then the disconnection is likely caused by this query. conn.query(query)
# We don't want to enter into an infinite loop. if query.startswith("SELECT "):
retry = 2 r = conn.store_result()
while 1: return tuple([
conn = self.conn tuple([d.tostring() if isinstance(d, array) else d
try: for d in row])
conn.query(query) for row in r.fetch_row(r.num_rows())])
if query.startswith("SELECT "):
r = conn.store_result()
return tuple([
tuple([d.tostring() if isinstance(d, array) else d
for d in row])
for row in r.fetch_row(r.num_rows())])
break
except OperationalError as m:
code, m = m.args
# IDEA: Is it safe to retry in case of DISK_FULL ?
# XXX: However, this would another case of failure that would
# be unnoticed by other nodes (ADMIN & MASTER). When
# there are replicas, it may be preferred to not retry.
if self._active or SERVER_GONE_ERROR != code != SERVER_LOST \
or not retry:
raise DatabaseFailure('MySQL error %d: %s\nQuery: %s'
% (code, m, getPrintableQuery(query[:1000])))
logging.info('the MySQL server is gone; reconnecting')
self._connect()
retry -= 1
r = query.split(None, 1)[0] r = query.split(None, 1)[0]
if r in ("INSERT", "REPLACE", "DELETE", "UPDATE"): if r in ("INSERT", "REPLACE", "DELETE", "UPDATE"):
self._active = 1 self._active = 1
...@@ -444,9 +474,9 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -444,9 +474,9 @@ class MySQLDatabaseManager(DatabaseManager):
PARTITION p%u VALUES IN (%u))""" % (offset, offset) PARTITION p%u VALUES IN (%u))""" % (offset, offset)
for table in 'trans', 'obj': for table in 'trans', 'obj':
try: try:
self.conn.query(add % table) self.query(add % table)
except OperationalError as e: except MysqlError as e:
if e.args[0] != SAME_NAME_PARTITION: if e.code != SAME_NAME_PARTITION:
raise raise
def dropPartitions(self, offset_list): def dropPartitions(self, offset_list):
...@@ -468,9 +498,9 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -468,9 +498,9 @@ class MySQLDatabaseManager(DatabaseManager):
','.join(' p%u' % i for i in offset_list) ','.join(' p%u' % i for i in offset_list)
for table in 'trans', 'obj': for table in 'trans', 'obj':
try: try:
self.conn.query(drop % table) self.query(drop % table)
except OperationalError as e: except MysqlError as e:
if e.args[0] != DROP_LAST_PARTITION: if e.code != DROP_LAST_PARTITION:
raise raise
def _getUnfinishedDataIdList(self): def _getUnfinishedDataIdList(self):
......
...@@ -15,14 +15,14 @@ ...@@ -15,14 +15,14 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest import unittest
from MySQLdb import NotSupportedError, OperationalError from MySQLdb import NotSupportedError, OperationalError, ProgrammingError
from MySQLdb.constants.ER import UNKNOWN_STORAGE_ENGINE from MySQLdb.constants.ER import UNKNOWN_STORAGE_ENGINE
from ..mock import Mock from ..mock import Mock
from neo.lib.exception import DatabaseFailure
from neo.lib.protocol import ZERO_OID from neo.lib.protocol import ZERO_OID
from neo.lib.util import p64 from neo.lib.util import p64
from .. import DB_PREFIX, DB_SOCKET, DB_USER from .. import DB_PREFIX, DB_SOCKET, DB_USER, Patch
from .testStorageDBTests import StorageDBTests from .testStorageDBTests import StorageDBTests
from neo.storage.database import DatabaseFailure
from neo.storage.database.mysqldb import MySQLDatabaseManager from neo.storage.database.mysqldb import MySQLDatabaseManager
...@@ -72,18 +72,11 @@ class StorageMySQLdbTests(StorageDBTests): ...@@ -72,18 +72,11 @@ class StorageMySQLdbTests(StorageDBTests):
from MySQLdb.constants.CR import SERVER_GONE_ERROR from MySQLdb.constants.CR import SERVER_GONE_ERROR
class FakeConn(object): class FakeConn(object):
def query(*args): def query(*args):
p.revert()
raise OperationalError(SERVER_GONE_ERROR, 'this is a test') raise OperationalError(SERVER_GONE_ERROR, 'this is a test')
self.db.conn = FakeConn() with Patch(self.db, conn=FakeConn()) as p:
self.connect_called = False self.assertRaises(ProgrammingError, self.db.query, 'QUERY')
def connect_hook(): self.assertFalse(p.applied)
# mock object, break raise/connect loop
self.db.conn = Mock()
self.connect_called = True
self.db._connect = connect_hook
# make a query, exception will be raised then connect() will be
# called and the second query will use the mock object
self.db.query('INSERT')
self.assertTrue(self.connect_called)
def test_query3(self): def test_query3(self):
# OperationalError > raise DatabaseFailure exception # OperationalError > raise DatabaseFailure exception
......
...@@ -30,7 +30,7 @@ from ZODB.DB import TransactionalUndo ...@@ -30,7 +30,7 @@ from ZODB.DB import TransactionalUndo
from neo.storage.transactions import TransactionManager, ConflictError from neo.storage.transactions import TransactionManager, ConflictError
from neo.lib.connection import ConnectionClosed, \ from neo.lib.connection import ConnectionClosed, \
ServerConnection, MTClientConnection ServerConnection, MTClientConnection
from neo.lib.exception import DatabaseFailure, StoppedOperation from neo.lib.exception import StoppedOperation
from neo.lib.handler import DelayEvent, EventHandler from neo.lib.handler import DelayEvent, EventHandler
from neo.lib import logging from neo.lib import logging
from neo.lib.protocol import (CellStates, ClusterStates, NodeStates, NodeTypes, from neo.lib.protocol import (CellStates, ClusterStates, NodeStates, NodeTypes,
...@@ -42,6 +42,7 @@ from neo.lib.util import add64, makeChecksum, p64, u64 ...@@ -42,6 +42,7 @@ from neo.lib.util import add64, makeChecksum, p64, u64
from neo.client.exception import NEOPrimaryMasterLost, NEOStorageError from neo.client.exception import NEOPrimaryMasterLost, NEOStorageError
from neo.client.transactions import Transaction from neo.client.transactions import Transaction
from neo.master.handlers.client import ClientServiceHandler from neo.master.handlers.client import ClientServiceHandler
from neo.storage.database import DatabaseFailure
from neo.storage.handlers.client import ClientOperationHandler from neo.storage.handlers.client import ClientOperationHandler
from neo.storage.handlers.identification import IdentificationHandler from neo.storage.handlers.identification import IdentificationHandler
from neo.storage.handlers.initialization import InitializationHandler from neo.storage.handlers.initialization import InitializationHandler
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment