Commit c2cc0c11 by Julien Muchembled

mysql: fail instead of silently reconnect if there is any pending change

1 parent 069b95e5
......@@ -82,13 +82,13 @@ class MySQLDatabaseManager(DatabaseManager):
while True:
try:
self.conn = MySQLdb.connect(**kwd)
break
except Exception:
if timeout_at is not None and time.time() >= timeout_at:
raise
logging.exception('Connection to MySQL failed, retrying.')
time.sleep(1)
else:
break
self._active = 0
self.conn.autocommit(False)
self.conn.query("SET SESSION group_concat_max_len = %u" % (2**32-1))
self.conn.set_sql_mode("TRADITIONAL,NO_ENGINE_SUBSTITUTION")
......@@ -96,6 +96,7 @@ class MySQLDatabaseManager(DatabaseManager):
def commit(self):
logging.debug('committing...')
self.conn.commit()
self._active = 0
def query(self, query):
"""Query data from a database."""
......@@ -111,25 +112,23 @@ class MySQLDatabaseManager(DatabaseManager):
logging.debug('querying %s...', query_part)
conn.query(query)
r = conn.store_result()
if r is not None:
new_r = []
for row in r.fetch_row(r.num_rows()):
new_row = []
for d in row:
if isinstance(d, array):
d = d.tostring()
new_row.append(d)
new_r.append(tuple(new_row))
r = tuple(new_r)
if query.startswith("SELECT "):
r = conn.store_result()
return tuple([
tuple([d.tostring() if isinstance(d, array) else d
for d in row])
for row in r.fetch_row(r.num_rows())])
r = query.split(None, 1)[0]
if r in ("INSERT", "REPLACE", "DELETE", "UPDATE"):
self._active = 1
else:
assert r in ("ALTER", "CREATE", "DROP", "TRUNCATE"), query
except OperationalError, m:
if m[0] in (SERVER_GONE_ERROR, SERVER_LOST):
if m[0] in (SERVER_GONE_ERROR, SERVER_LOST) and not self._active:
logging.info('the MySQL server is gone; reconnecting')
self._connect()
return self.query(query)
raise DatabaseFailure('MySQL error %d: %s' % (m[0], m[1]))
return r
@property
def escape(self):
......
......@@ -75,14 +75,14 @@ class StorageTests(NEOFunctionalTest):
# wait for the sql transaction to be commited
def callback(last_try):
# One revision per object and two for the root, before and after
(object_number,), = db.query('select count(*) from obj')
(object_number,), = db.query('SELECT count(*) FROM obj')
return object_number == OBJECT_NUMBER + 2, object_number
self.neo.expectCondition(callback)
# no more temporarily objects
(t_objects,), = db.query('select count(*) from tobj')
(t_objects,), = db.query('SELECT count(*) FROM tobj')
self.assertEqual(t_objects, 0)
# One object more for the root
query = 'select count(*) from (select * from obj group by oid) as t'
query = 'SELECT count(*) FROM (SELECT * FROM obj GROUP BY oid) AS t'
(objects,), = db.query(query)
self.assertEqual(objects, OBJECT_NUMBER + 1)
# Check object content
......
......@@ -55,11 +55,11 @@ class StorageMySQSLdbTests(StorageDBTests):
(1, 2, '\x01\x02', ),
)
self.db.conn = Mock({ 'store_result': result_object })
result = self.db.query('QUERY')
result = self.db.query('SELECT ')
self.assertEqual(result, expected_result)
calls = self.db.conn.mockGetNamedCalls('query')
self.assertEqual(len(calls), 1)
calls[0].checkArgs('QUERY')
calls[0].checkArgs('SELECT ')
def test_query2(self):
# test the OperationalError exception
......@@ -73,12 +73,12 @@ class StorageMySQSLdbTests(StorageDBTests):
self.connect_called = False
def connect_hook():
# mock object, break raise/connect loop
self.db.conn = Mock({'num_rows': 0})
self.db.conn = Mock()
self.connect_called = True
self.db._connect = connect_hook
# make a query, exception will be raised then connect() will be
# called and the second query will use the mock object
self.db.query('QUERY')
self.db.query('INSERT')
self.assertTrue(self.connect_called)
def test_query3(self):
......
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!