Commit 256c53ca authored by Vincent Pelletier's avatar Vincent Pelletier

Split Dispatcher class into 2:

 - There is no need for dispatcher to be a thread
 - Polling thread does not need to be a dispatcher
In turns, this allows simplifying Storage.__init__ by moving Dispatcher, ThreadedPoll and EventManager class instantiations to App class.
Add a TODO about missing ThreadedPoll shutdown.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@323 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent d5a2938b
...@@ -21,10 +21,8 @@ from ZODB import BaseStorage, ConflictResolution, POSException ...@@ -21,10 +21,8 @@ from ZODB import BaseStorage, ConflictResolution, POSException
from ZODB.utils import oid_repr, p64, u64 from ZODB.utils import oid_repr, p64, u64
import logging import logging
from neo.client.dispatcher import Dispatcher
from neo.client.app import Application from neo.client.app import Application
from neo.client.exception import NEOStorageConflictError, NEOStorageNotFoundError from neo.client.exception import NEOStorageConflictError, NEOStorageNotFoundError
from neo.event import EventManager
from neo.util import dump from neo.util import dump
class Storage(BaseStorage.BaseStorage, class Storage(BaseStorage.BaseStorage,
...@@ -39,13 +37,7 @@ class Storage(BaseStorage.BaseStorage, ...@@ -39,13 +37,7 @@ class Storage(BaseStorage.BaseStorage,
l = Lock() l = Lock()
self._txn_lock_acquire = l.acquire self._txn_lock_acquire = l.acquire
self._txn_lock_release = l.release self._txn_lock_release = l.release
# Create the event manager self.app = Application(master_nodes, name, connector)
em = EventManager()
# Create dispatcher thread
dispatcher = Dispatcher(em)
dispatcher.setDaemon(True)
dispatcher.start()
self.app = Application(master_nodes, name, em, dispatcher, connector)
def load(self, oid, version=None): def load(self, oid, version=None):
try: try:
......
...@@ -36,6 +36,9 @@ from neo.client.exception import NEOStorageError, NEOStorageConflictError, \ ...@@ -36,6 +36,9 @@ from neo.client.exception import NEOStorageError, NEOStorageConflictError, \
NEOStorageNotFoundError NEOStorageNotFoundError
from neo.util import makeChecksum, dump from neo.util import makeChecksum, dump
from neo.connector import getConnectorHandler from neo.connector import getConnectorHandler
from neo.client.dispatcher import Dispatcher
from neo.client.poll import ThreadedPoll
from neo.event import EventManager
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
from ZODB.utils import p64, u64, oid_repr from ZODB.utils import p64, u64, oid_repr
...@@ -178,14 +181,17 @@ class ConnectionPool(object): ...@@ -178,14 +181,17 @@ class ConnectionPool(object):
class Application(object): class Application(object):
"""The client node application.""" """The client node application."""
def __init__(self, master_nodes, name, em, dispatcher, connector, **kw): def __init__(self, master_nodes, name, connector, **kw):
logging.basicConfig(level = logging.DEBUG) logging.basicConfig(level = logging.DEBUG)
logging.debug('master node address are %s' %(master_nodes,)) logging.debug('master node address are %s' %(master_nodes,))
em = EventManager()
# Start polling thread
self.poll_thread = ThreadedPoll(em)
# Internal Attributes common to all thread # Internal Attributes common to all thread
self.name = name self.name = name
self.em = em self.em = em
self.connector_handler = getConnectorHandler(connector) self.connector_handler = getConnectorHandler(connector)
self.dispatcher = dispatcher self.dispatcher = Dispatcher()
self.nm = NodeManager() self.nm = NodeManager()
self.cp = ConnectionPool(self) self.cp = ConnectionPool(self)
self.pt = None self.pt = None
...@@ -198,7 +204,7 @@ class Application(object): ...@@ -198,7 +204,7 @@ class Application(object):
self.ptid = None self.ptid = None
self.num_replicas = 0 self.num_replicas = 0
self.num_partitions = 0 self.num_partitions = 0
self.answer_handler = ClientAnswerEventHandler(self, dispatcher) self.answer_handler = ClientAnswerEventHandler(self, self.dispatcher)
# Transaction specific variable # Transaction specific variable
self.tid = None self.tid = None
self.txn = None self.txn = None
...@@ -895,6 +901,7 @@ class Application(object): ...@@ -895,6 +901,7 @@ class Application(object):
def __del__(self): def __del__(self):
"""Clear all connection.""" """Clear all connection."""
# TODO: Stop polling thread here.
# Due to bug in ZODB, close is not always called when shutting # Due to bug in ZODB, close is not always called when shutting
# down zope, so use __del__ to close connections # down zope, so use __del__ to close connections
for conn in self.em.getConnectionList(): for conn in self.em.getConnectionList():
......
...@@ -15,29 +15,14 @@ ...@@ -15,29 +15,14 @@
# along with this program; if not, write to the Free Software # along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from threading import Thread class Dispatcher:
import logging
class Dispatcher(Thread):
"""Dispatcher class use to redirect request to thread.""" """Dispatcher class use to redirect request to thread."""
def __init__(self, em, **kw): def __init__(self):
Thread.__init__(self, **kw)
self.em = em
# This dict is used to associate conn/message id to client thread queue # This dict is used to associate conn/message id to client thread queue
# and thus redispatch answer to the original thread # and thus redispatch answer to the original thread
self.message_table = {} self.message_table = {}
def run(self):
while 1:
# First check if we receive any new message from other node
try:
self.em.poll(None)
except KeyError:
# This happen when there is no connection
logging.error('Dispatcher, run, poll returned a KeyError')
def getQueue(self, conn, packet): def getQueue(self, conn, packet):
key = (id(conn), packet.getId()) key = (id(conn), packet.getId())
return self.message_table.pop(key, None) return self.message_table.pop(key, None)
......
#
# Copyright (C) 2006-2009 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from threading import Thread
import logging
class ThreadedPoll(Thread):
"""Polling thread."""
def __init__(self, em, **kw):
Thread.__init__(self, **kw)
self.em = em
self.setDaemon(True)
self.start()
def run(self):
while 1:
# First check if we receive any new message from other node
try:
self.em.poll(None)
except KeyError:
# This happen when there is no connection
# XXX: This should be handled inside event manager, not here.
logging.error('Dispatcher, run, poll returned a KeyError')
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment