Commit 8a8ea3bb authored by Vincent Pelletier's avatar Vincent Pelletier

Fix same thread concurency mistake as in ZMySQLDDA/db.py (apply similar changes as r13845:13847).


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@13850 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent f0f84789
...@@ -86,7 +86,7 @@ ...@@ -86,7 +86,7 @@
database_type='MySQL' database_type='MySQL'
import os import os
from db import DeferredDB from db import ThreadedDeferredDB
import Shared.DC.ZRDB.Connection, sys, DABase import Shared.DC.ZRDB.Connection, sys, DABase
from App.Dialogs import MessageDialog from App.Dialogs import MessageDialog
from Globals import HTMLFile from Globals import HTMLFile
...@@ -120,7 +120,7 @@ class DeferredConnection(DABase.Connection): ...@@ -120,7 +120,7 @@ class DeferredConnection(DABase.Connection):
manage_properties=HTMLFile('connectionEdit', globals()) manage_properties=HTMLFile('connectionEdit', globals())
def factory(self): return DeferredDB def factory(self): return ThreadedDeferredDB
def connect(self, s): def connect(self, s):
try: try:
...@@ -133,8 +133,8 @@ class DeferredConnection(DABase.Connection): ...@@ -133,8 +133,8 @@ class DeferredConnection(DABase.Connection):
else: else:
if connection is not None: if connection is not None:
connection.closeConnection() connection.closeConnection()
DB = self.factory() ThreadedDeferredDB = self.factory()
database_connection_pool[pool_key] = DeferredDB(s) database_connection_pool[pool_key] = ThreadedDeferredDB(s)
self._v_database_connection = database_connection_pool[pool_key] self._v_database_connection = database_connection_pool[pool_key]
# XXX If date is used as such, it can be wrong because an existing # XXX If date is used as such, it can be wrong because an existing
# connection may be reused. But this is suposedly only used as a # connection may be reused. But this is suposedly only used as a
......
...@@ -104,7 +104,7 @@ from zLOG import LOG, ERROR, INFO ...@@ -104,7 +104,7 @@ from zLOG import LOG, ERROR, INFO
import string, sys import string, sys
from string import strip, split, find, upper, rfind from string import strip, split, find, upper, rfind
from time import time from time import time
from thread import get_ident from thread import get_ident, allocate_lock
hosed_connection = ( hosed_connection = (
CR.SERVER_GONE_ERROR, CR.SERVER_GONE_ERROR,
...@@ -154,28 +154,13 @@ def int_or_long(s): ...@@ -154,28 +154,13 @@ def int_or_long(s):
try: return int(s) try: return int(s)
except: return long(s) except: return long(s)
class DeferredDB(TM): class ThreadedDeferredDB:
""" """
An experimental MySQL DA which implements deferred execution An experimental MySQL DA which implements deferred execution
of SQL code in order to reduce locks and provide better behaviour of SQL code in order to reduce locks and provide better behaviour
with MyISAM non transactional tables with MyISAM non transactional tables
""" """
Database_Connection=_mysql.connect
Database_Error=_mysql.Error
def Database_Connection(self, *args, **kwargs):
return MySQLdb.connect(*args, **kwargs)
defs={
FIELD_TYPE.CHAR: "i", FIELD_TYPE.DATE: "d",
FIELD_TYPE.DATETIME: "d", FIELD_TYPE.DECIMAL: "n",
FIELD_TYPE.DOUBLE: "n", FIELD_TYPE.FLOAT: "n", FIELD_TYPE.INT24: "i",
FIELD_TYPE.LONG: "i", FIELD_TYPE.LONGLONG: "l",
FIELD_TYPE.SHORT: "i", FIELD_TYPE.TIMESTAMP: "d",
FIELD_TYPE.TINY: "i", FIELD_TYPE.YEAR: "i",
}
conv=conversions.copy() conv=conversions.copy()
conv[FIELD_TYPE.LONG] = int_or_long conv[FIELD_TYPE.LONG] = int_or_long
conv[FIELD_TYPE.DATETIME] = DateTime_or_None conv[FIELD_TYPE.DATETIME] = DateTime_or_None
...@@ -183,15 +168,19 @@ class DeferredDB(TM): ...@@ -183,15 +168,19 @@ class DeferredDB(TM):
conv[FIELD_TYPE.DECIMAL] = float conv[FIELD_TYPE.DECIMAL] = float
del conv[FIELD_TYPE.TIME] del conv[FIELD_TYPE.TIME]
_p_oid=_p_changed=_registered=None
def __init__(self,connection): def __init__(self,connection):
self.connection=connection """
self.kwargs = self._parse_connection_string(connection) Parse the connection string.
self.db = {} Initiate a trial connection with the database to check
self._finished_or_aborted = {} transactionality once instead of once per DeferredDB instance.
db = self._getConnection() """
transactional = db.server_capabilities & CLIENT.TRANSACTIONS self._connection = connection
self._kw_args = self._parse_connection_string(connection)
self._db_pool = {}
self._db_lock = allocate_lock()
connection = MySQLdb.connect(**self._kw_args)
transactional = connection.server_capabilities & CLIENT.TRANSACTIONS
connection.close()
if self._try_transactions == '-': if self._try_transactions == '-':
transactional = 0 transactional = 0
elif not transactional and self._try_transactions == '+': elif not transactional and self._try_transactions == '+':
...@@ -199,48 +188,6 @@ class DeferredDB(TM): ...@@ -199,48 +188,6 @@ class DeferredDB(TM):
self._use_TM = self._transactions = transactional self._use_TM = self._transactions = transactional
if self._mysql_lock: if self._mysql_lock:
self._use_TM = 1 self._use_TM = 1
self._sql_string_list_dict = {}
def __del__(self):
self._cleanupConnections()
def _getFinishedOrAborted(self):
return self._finished_or_aborted[get_ident()]
def _setFinishedOrAborted(self, value):
self._finished_or_aborted[get_ident()] = value
def _cleanupConnections(self):
for db in self.db.itervalues():
db.close()
def _forceReconnection(self):
db = apply(self.Database_Connection, (), self.kwargs)
self.db[get_ident()] = db
return db
def _getConnection(self):
ident = get_ident()
db = self.db.get(ident)
if db is None:
db = self._forceReconnection()
return db
def _closeConnection(self):
ident = get_ident()
db = self.db.get(ident)
if db is not None:
db.close()
del self.db[ident]
def _emptySQLStringList(self):
self._sql_string_list_dict[get_ident()] = []
def _appendToSQLStringList(self, value):
self._sql_string_list_dict[get_ident()].append(value)
def _getSQLStringList(self):
return self._sql_string_list_dict[get_ident()]
def _parse_connection_string(self, connection): def _parse_connection_string(self, connection):
kwargs = {'conv': self.conv} kwargs = {'conv': self.conv}
...@@ -279,6 +226,81 @@ class DeferredDB(TM): ...@@ -279,6 +226,81 @@ class DeferredDB(TM):
kwargs['unix_socket'], items = items[0], items[1:] kwargs['unix_socket'], items = items[0], items[1:]
return kwargs return kwargs
def _pool_set(self, key, value):
self._db_lock.acquire()
try:
self._db_pool[key] = value
finally:
self._db_lock.release()
def _pool_get(self, key):
self._db_lock.acquire()
try:
return self._db_pool.get(key)
finally:
self._db_lock.release()
def _access_db(self, method_id, args, kw):
"""
Generic method to call pooled objects' methods.
When the current thread had never issued any call, create a DeferredDB
instance.
"""
ident = get_ident()
db = self._pool_get(ident)
if db is None:
db = DeferredDB(kw_args=self._kw_args, use_TM=self._use_TM,
mysql_lock=self._mysql_lock,
transactions=self._transactions)
self._pool_set(ident, db)
return getattr(db, method_id)(*args, **kw)
def tables(self, *args, **kw):
return self._access_db(method_id='tables', args=args, kw=kw)
def columns(self, *args, **kw):
return self._access_db(method_id='columns', args=args, kw=kw)
def query(self, *args, **kw):
return self._access_db(method_id='query', args=args, kw=kw)
def string_literal(self, *args, **kw):
return self._access_db(method_id='string_literal', args=args, kw=kw)
class DeferredDB(TM):
"""
An experimental MySQL DA which implements deferred execution
of SQL code in order to reduce locks and provide better behaviour
with MyISAM non transactional tables
"""
defs={
FIELD_TYPE.CHAR: "i", FIELD_TYPE.DATE: "d",
FIELD_TYPE.DATETIME: "d", FIELD_TYPE.DECIMAL: "n",
FIELD_TYPE.DOUBLE: "n", FIELD_TYPE.FLOAT: "n", FIELD_TYPE.INT24: "i",
FIELD_TYPE.LONG: "i", FIELD_TYPE.LONGLONG: "l",
FIELD_TYPE.SHORT: "i", FIELD_TYPE.TIMESTAMP: "d",
FIELD_TYPE.TINY: "i", FIELD_TYPE.YEAR: "i",
}
_p_oid=_p_changed=_registered=None
def __init__(self, kw_args, use_TM, mysql_lock, transactions):
self._kw_args = kw_args
self._mysql_lock = mysql_lock
self._use_TM = use_TM
self._transactions = transactions
self._forceReconnection()
self._sql_string_list = []
def __del__(self):
self.db.close()
def _forceReconnection(self):
db = MySQLdb.connect(**self._kw_args)
self.db = db
def tables(self, rdb=0, def tables(self, rdb=0,
_care=('TABLE', 'VIEW')): _care=('TABLE', 'VIEW')):
r=[] r=[]
...@@ -344,9 +366,8 @@ class DeferredDB(TM): ...@@ -344,9 +366,8 @@ class DeferredDB(TM):
because they are bound to the connection. This check can be because they are bound to the connection. This check can be
overridden by passing force_reconnect with True value. overridden by passing force_reconnect with True value.
""" """
db = self._getConnection()
try: try:
db.query(query) self.db.query(query)
except OperationalError, m: except OperationalError, m:
if ((not force_reconnect) and \ if ((not force_reconnect) and \
(self._mysql_lock or self._transactions)) or \ (self._mysql_lock or self._transactions)) or \
...@@ -354,8 +375,8 @@ class DeferredDB(TM): ...@@ -354,8 +375,8 @@ class DeferredDB(TM):
raise raise
# Hm. maybe the db is hosed. Let's restart it. # Hm. maybe the db is hosed. Let's restart it.
self._forceReconnection() self._forceReconnection()
db.query(query) self.db.query(query)
return db.store_result() return self.db.store_result()
def query(self,query_string, max_rows=1000): def query(self,query_string, max_rows=1000):
self._use_TM and self._register() self._use_TM and self._register()
...@@ -363,31 +384,31 @@ class DeferredDB(TM): ...@@ -363,31 +384,31 @@ class DeferredDB(TM):
qtype = upper(split(qs, None, 1)[0]) qtype = upper(split(qs, None, 1)[0])
if qtype == "SELECT": if qtype == "SELECT":
raise NotSupportedError, "can not SELECT in deferred connections" raise NotSupportedError, "can not SELECT in deferred connections"
self._appendToSQLStringList(qs) self._sql_string_list.append(qs)
return (),() return (),()
def string_literal(self, s): def string_literal(self, s):
return self._getConnection().string_literal(s) return self.db.string_literal(s)
def _begin(self, *ignored): def _begin(self, *ignored):
# The Deferred DB instance is sometimes used for several # The Deferred DB instance is sometimes used for several
# transactions, so it is required to clear the sql_string_list # transactions, so it is required to clear the sql_string_list
# each time a transaction starts # each time a transaction starts
self._emptySQLStringList() self._sql_string_list = []
self._setFinishedOrAborted(False) self._transaction_begun = True
def _finish(self, *ignored): def _finish(self, *ignored):
if self._getFinishedOrAborted(): if not self._transaction_begun:
return return
self._setFinishedOrAborted(True) self._transaction_begun = False
# Ping the database to reconnect if connection was lost. # Ping the database to reconnect if connection was lost.
self._query("SELECT 1", force_reconnect=True) self._query("SELECT 1", force_reconnect=True)
if self._transactions: if self._transactions:
self._query("BEGIN") self._query("BEGIN")
if self._mysql_lock: if self._mysql_lock:
self._query("SELECT GET_LOCK('%s',0)" % self._mysql_lock) self._query("SELECT GET_LOCK('%s',0)" % self._mysql_lock)
for qs in self._getSQLStringList(): for qs in self._sql_string_list:
self._query(qs) self._query(qs)
if self._mysql_lock: if self._mysql_lock:
self._query("SELECT RELEASE_LOCK('%s')" % self._mysql_lock) self._query("SELECT RELEASE_LOCK('%s')" % self._mysql_lock)
...@@ -395,5 +416,5 @@ class DeferredDB(TM): ...@@ -395,5 +416,5 @@ class DeferredDB(TM):
self._query("COMMIT") self._query("COMMIT")
def _abort(self, *ignored): def _abort(self, *ignored):
self._setFinishedOrAborted(True) self._transaction_begun = False
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment