Commit f5f42522 by Julien Muchembled

Move code from neo.client to neo.lib, since admins will be also multi-threaded

parent 50d25d00
......@@ -16,7 +16,6 @@
from cPickle import dumps, loads
from zlib import compress, decompress
from neo.lib.locking import Empty
from random import shuffle
import heapq
import time
......@@ -33,19 +32,18 @@ from neo.lib.protocol import NodeTypes, Packets, \
INVALID_PARTITION, ZERO_HASH, ZERO_TID
from neo.lib.event import EventManager
from neo.lib.util import makeChecksum, dump
from neo.lib.locking import Lock
from neo.lib.locking import Empty, Lock, SimpleQueue
from neo.lib.connection import MTClientConnection, ConnectionClosed
from neo.lib.node import NodeManager
from .exception import NEOStorageError, NEOStorageCreationUndoneError
from .exception import NEOStorageNotFoundError
from .handlers import storage, master
from neo.lib.dispatcher import Dispatcher, ForgottenPacket
from .poll import ThreadedPoll, psThreadedPoll
from neo.lib.threaded_app import ThreadedApplication
from .cache import ClientCache
from .pool import ConnectionPool
from neo.lib.util import p64, u64, parseMasterList
from neo.lib.debug import register as registerLiveDebugger
from .container import ThreadContainer, TransactionContainer
CHECKED_SERIAL = master.CHECKED_SERIAL
......@@ -57,41 +55,55 @@ if SignalHandler:
import signal
SignalHandler.registerHandler(signal.SIGUSR2, logging.reopen)
class app_set(weakref.WeakSet):
def on_log(self):
for app in self:
app.log()
class TransactionContainer(dict):
app_set = app_set()
registerLiveDebugger(app_set.on_log)
def pop(self, txn):
return dict.pop(self, id(txn), None)
class Application(object):
def get(self, txn):
try:
return self[id(txn)]
except KeyError:
raise StorageTransactionError("unknown transaction %r" % txn)
def new(self, txn):
key = id(txn)
if key in self:
raise StorageTransactionError("commit of transaction %r"
" already started" % txn)
context = self[key] = {
'queue': SimpleQueue(),
'txn': txn,
'ttid': None,
'data_dict': {},
'data_size': 0,
'cache_dict': {},
'cache_size': 0,
'object_base_serial_dict': {},
'object_serial_dict': {},
'object_stored_counter_dict': {},
'conflict_serial_dict': {},
'resolved_conflict_serial_dict': {},
'involved_nodes': set(),
}
return context
class Application(ThreadedApplication):
"""The client node application."""
def __init__(self, master_nodes, name, compress=True,
dynamic_master_list=None):
# Start polling thread
self.em = EventManager()
self.poll_thread = ThreadedPoll(self.em, name=name)
psThreadedPoll()
super(Application, self).__init__(parseMasterList(master_nodes),
name, dynamic_master_list)
# Internal Attributes common to all thread
self._db = None
self.name = name
self.dispatcher = Dispatcher(self.poll_thread)
self.nm = NodeManager(dynamic_master_list)
self.cp = ConnectionPool(self)
self.master_conn = None
self.primary_master_node = None
self.trying_master_node = None
# load master node list
for address in parseMasterList(master_nodes):
self.nm.createMaster(address=address)
# no self-assigned UUID, primary master will supply us one
self.uuid = None
self._cache = ClientCache()
self._loading_oid = None
self.new_oid_list = ()
......@@ -103,8 +115,6 @@ class Application(object):
self.primary_handler = master.PrimaryAnswersHandler(self)
self.primary_bootstrap_handler = master.PrimaryBootstrapHandler(self)
self.notifications_handler = master.PrimaryNotificationsHandler( self)
# Internal attribute distinct between thread
self._thread_container = ThreadContainer()
self._txn_container = TransactionContainer()
# Lock definition :
# _load_lock is used to make loading and storing atomic
......@@ -124,7 +134,11 @@ class Application(object):
# node connection attemps
self._connecting_to_master_node = Lock()
self.compress = compress
app_set.add(self) # to register self.on_log
def close(self):
self.cp.flush()
self._txn_container.clear()
super(Application, self).close()
def __getattr__(self, attr):
if attr == 'pt':
......@@ -136,45 +150,6 @@ class Application(object):
# do not iter lazily to avoid race condition
return self._txn_container.values
def getHandlerData(self):
return self._thread_container.answer
def setHandlerData(self, data):
self._thread_container.answer = data
def log(self):
self.em.log()
self.nm.log()
pt = self.__dict__.get('pt')
if pt is not None:
pt.log()
def _handlePacket(self, conn, packet, kw={}, handler=None):
"""
conn
The connection which received the packet (forwarded to handler).
packet
The packet to handle.
handler
The handler to use to handle packet.
If not given, it will be guessed from connection's not type.
"""
if handler is None:
# Guess the handler to use based on the type of node on the
# connection
node = self.nm.getByAddress(conn.getAddress())
if node is None:
raise ValueError, 'Expecting an answer from a node ' \
'which type is not known... Is this right ?'
if node.isStorage():
handler = self.storage_handler
elif node.isMaster():
handler = self.primary_handler
else:
raise ValueError, 'Unknown node type: %r' % (node.__class__, )
with conn.lock:
handler.dispatch(conn, packet, kw)
def _waitAnyMessage(self, queue, block=True):
"""
Handle all pending packets.
......@@ -212,29 +187,6 @@ class Application(object):
# Don't leave access to thread context, even if a raise happens.
self.setHandlerData(None)
def _ask(self, conn, packet, handler=None, **kw):
self.setHandlerData(None)
queue = self._thread_container.queue
msg_id = conn.ask(packet, queue=queue, **kw)
get = queue.get
_handlePacket = self._handlePacket
while True:
qconn, qpacket, kw = get(True)
is_forgotten = isinstance(qpacket, ForgottenPacket)
if conn is qconn:
# check fake packet
if qpacket is None:
raise ConnectionClosed
if msg_id == qpacket.getId():
if is_forgotten:
raise ValueError, 'ForgottenPacket for an ' \
'explicitely expected packet.'
_handlePacket(qconn, qpacket, kw, handler)
break
if not is_forgotten and qpacket is not None:
_handlePacket(qconn, qpacket, kw)
return self.getHandlerData()
def _askStorage(self, conn, packet, **kw):
""" Send a request to a storage node and process its answer """
return self._ask(conn, packet, handler=self.storage_handler, **kw)
......@@ -946,20 +898,6 @@ class Application(object):
self._askPrimary(Packets.AskLastTransaction())
return self.last_tid
def __del__(self):
"""Clear all connection."""
# Due to bug in ZODB, close is not always called when shutting
# down zope, so use __del__ to close connections
for conn in self.em.getConnectionList():
conn.close()
self.cp.flush()
self.master_conn = None
# Stop polling thread
logging.debug('Stopping %s', self.poll_thread)
self.poll_thread.stop()
psThreadedPoll()
close = __del__
def pack(self, t):
tid = repr(TimeStamp(*time.gmtime(t)[:5] + (t % 60, )))
if tid == ZERO_TID:
......
#
# Copyright (C) 2011-2015 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import threading
from neo.lib.locking import Lock, Empty
from collections import deque
from ZODB.POSException import StorageTransactionError
class SimpleQueue(object):
"""
Similar to Queue.Queue but with simpler locking scheme, reducing lock
contention on "put" (benchmark shows 60% less time spent in "put").
As a result:
- only a single consumer possible ("get" vs. "get" race condition)
- only a single producer possible ("put" vs. "put" race condition)
- no blocking size limit possible
- no consumer -> producer notifications (task_done/join API)
Queue is on the critical path: any moment spent here increases client
application wait for object data, transaction completion, etc.
As we have a single consumer (client application's thread) and a single
producer (lib.dispatcher, which can be called from several threads but
serialises calls internally) for each queue, Queue.Queue's locking scheme
can be relaxed to reduce latency.
"""
__slots__ = ('_lock', '_unlock', '_popleft', '_append', '_queue')
def __init__(self):
lock = Lock()
self._lock = lock.acquire
self._unlock = lock.release
self._queue = queue = deque()
self._popleft = queue.popleft
self._append = queue.append
def get(self, block):
if block:
self._lock(False)
while True:
try:
return self._popleft()
except IndexError:
if not block:
raise Empty
self._lock()
def put(self, item):
self._append(item)
self._lock(False)
self._unlock()
def empty(self):
return not self._queue
class ThreadContainer(threading.local):
def __init__(self):
self.queue = SimpleQueue()
self.answer = None
class TransactionContainer(dict):
def pop(self, txn):
return dict.pop(self, id(txn), None)
def get(self, txn):
try:
return self[id(txn)]
except KeyError:
raise StorageTransactionError("unknown transaction %r" % txn)
def new(self, txn):
key = id(txn)
if key in self:
raise StorageTransactionError("commit of transaction %r"
" already started" % txn)
context = self[key] = {
'queue': SimpleQueue(),
'txn': txn,
'ttid': None,
'data_dict': {},
'data_size': 0,
'cache_dict': {},
'cache_size': 0,
'object_base_serial_dict': {},
'object_serial_dict': {},
'object_stored_counter_dict': {},
'conflict_serial_dict': {},
'resolved_conflict_serial_dict': {},
'involved_nodes': set(),
}
return context
......@@ -14,51 +14,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib.handler import EventHandler
from neo.lib.protocol import ProtocolError, Packets
from neo.lib import handler
from ZODB.POSException import StorageError
class BaseHandler(EventHandler):
"""Base class for client-side EventHandler implementations."""
def __init__(self, app):
super(BaseHandler, self).__init__(app)
self.dispatcher = app.dispatcher
def dispatch(self, conn, packet, kw={}):
assert conn.lock._is_owned() # XXX: see also lockCheckWrapper
super(BaseHandler, self).dispatch(conn, packet, kw)
def packetReceived(self, conn, packet, kw={}):
"""Redirect all received packet to dispatcher thread."""
if packet.isResponse() and type(packet) is not Packets.Pong:
if not self.dispatcher.dispatch(conn, packet.getId(), packet, kw):
raise ProtocolError('Unexpected response packet from %r: %r'
% (conn, packet))
else:
self.dispatch(conn, packet, kw)
def connectionLost(self, conn, new_state):
self.app.dispatcher.unregister(conn)
def connectionFailed(self, conn):
self.app.dispatcher.unregister(conn)
def unexpectedInAnswerHandler(*args, **kw):
raise Exception('Unexpected event in an answer handler')
class AnswerBaseHandler(EventHandler):
connectionStarted = unexpectedInAnswerHandler
connectionCompleted = unexpectedInAnswerHandler
connectionFailed = unexpectedInAnswerHandler
connectionAccepted = unexpectedInAnswerHandler
timeoutExpired = unexpectedInAnswerHandler
connectionClosed = unexpectedInAnswerHandler
packetReceived = unexpectedInAnswerHandler
peerBroken = unexpectedInAnswerHandler
class AnswerBaseHandler(handler.AnswerBaseHandler): # XXX
def protocolError(self, conn, message):
raise StorageError("protocol error: %s" % message)
......@@ -15,10 +15,11 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging
from neo.lib.handler import MTEventHandler
from neo.lib.pt import MTPartitionTable as PartitionTable
from neo.lib.protocol import NodeStates, Packets, ProtocolError
from neo.lib.util import dump, add64
from . import BaseHandler, AnswerBaseHandler
from . import AnswerBaseHandler
from ..exception import NEOStorageError
CHECKED_SERIAL = object()
......@@ -91,7 +92,7 @@ class PrimaryBootstrapHandler(AnswerBaseHandler):
def answerLastTransaction(self, conn, ltid):
pass
class PrimaryNotificationsHandler(BaseHandler):
class PrimaryNotificationsHandler(MTEventHandler):
""" Handler that process the notifications from the primary master """
def packetReceived(self, conn, packet, kw={}):
......@@ -133,7 +134,7 @@ class PrimaryNotificationsHandler(BaseHandler):
callback(tid)
finally:
app._cache_lock_release()
BaseHandler.packetReceived(self, conn, packet, kw)
MTEventHandler.packetReceived(self, conn, packet, kw)
def connectionClosed(self, conn):
app = self.app
......
......@@ -21,11 +21,12 @@ from neo.lib import logging
from neo.lib.protocol import LockState, ZERO_TID
from neo.lib.util import dump
from neo.lib.exception import NodeNotReady
from . import BaseHandler, AnswerBaseHandler
from neo.lib.handler import MTEventHandler
from . import AnswerBaseHandler
from ..exception import NEOStorageError, NEOStorageNotFoundError
from ..exception import NEOStorageDoesNotExistError
class StorageEventHandler(BaseHandler):
class StorageEventHandler(MTEventHandler):
def connectionLost(self, conn, new_state):
node = self.app.nm.getByAddress(conn.getAddress())
......
......@@ -196,3 +196,46 @@ class EventHandler(object):
def backendNotImplemented(self, conn, message):
raise NotImplementedError(message)
class MTEventHandler(EventHandler):
"""Base class of handler implementations for MTClientConnection"""
def __init__(self, app):
super(MTEventHandler, self).__init__(app)
self.dispatcher = app.dispatcher
def dispatch(self, conn, packet, kw={}):
assert conn.lock._is_owned() # XXX: see also lockCheckWrapper
super(MTEventHandler, self).dispatch(conn, packet, kw)
def packetReceived(self, conn, packet, kw={}):
"""Redirect all received packet to dispatcher thread."""
if packet.isResponse() and type(packet) is not Packets.Pong:
if not self.dispatcher.dispatch(conn, packet.getId(), packet, kw):
raise ProtocolError('Unexpected response packet from %r: %r'
% (conn, packet))
else:
self.dispatch(conn, packet, kw)
def connectionLost(self, conn, new_state):
self.dispatcher.unregister(conn)
def connectionFailed(self, conn):
self.dispatcher.unregister(conn)
def unexpectedInAnswerHandler(*args, **kw):
raise Exception('Unexpected event in an answer handler')
class AnswerBaseHandler(EventHandler):
connectionStarted = unexpectedInAnswerHandler
connectionCompleted = unexpectedInAnswerHandler
connectionFailed = unexpectedInAnswerHandler
connectionAccepted = unexpectedInAnswerHandler
timeoutExpired = unexpectedInAnswerHandler
connectionClosed = unexpectedInAnswerHandler
packetReceived = unexpectedInAnswerHandler
peerBroken = unexpectedInAnswerHandler
protocolError = unexpectedInAnswerHandler
......@@ -2,6 +2,7 @@ import os
import sys
import threading
import traceback
from collections import deque
from time import time
from Queue import Empty
......@@ -164,3 +165,49 @@ else:
Lock = threading.Lock
RLock = threading.RLock
Semaphore = threading.Semaphore
class SimpleQueue(object):
"""
Similar to Queue.Queue but with simpler locking scheme, reducing lock
contention on "put" (benchmark shows 60% less time spent in "put").
As a result:
- only a single consumer possible ("get" vs. "get" race condition)
- only a single producer possible ("put" vs. "put" race condition)
- no blocking size limit possible
- no consumer -> producer notifications (task_done/join API)
Queue is on the critical path: any moment spent here increases client
application wait for object data, transaction completion, etc.
As we have a single consumer (client application's thread) and a single
producer (lib.dispatcher, which can be called from several threads but
serialises calls internally) for each queue, Queue.Queue's locking scheme
can be relaxed to reduce latency.
"""
__slots__ = ('_lock', '_unlock', '_popleft', '_append', '_queue')
def __init__(self):
lock = Lock()
self._lock = lock.acquire
self._unlock = lock.release
self._queue = queue = deque()
self._popleft = queue.popleft
self._append = queue.append
def get(self, block):
if block:
self._lock(False)
while True:
try:
return self._popleft()
except IndexError:
if not block:
raise Empty
self._lock()
def put(self, item):
self._append(item)
self._lock(False)
self._unlock()
def empty(self):
return not self._queue
#
# Copyright (C) 2006-2015 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import threading, weakref
from . import logging
from .connection import ConnectionClosed
from .debug import register as registerLiveDebugger
from .dispatcher import Dispatcher, ForgottenPacket
from .event import EventManager
from .locking import SimpleQueue
from .node import NodeManager
from .protocol import Packets
from .threaded_poll import ThreadedPoll, psThreadedPoll
class app_set(weakref.WeakSet):
def on_log(self):
for app in self:
app.log()
app_set = app_set()
registerLiveDebugger(app_set.on_log)
class ThreadContainer(threading.local):
def __init__(self):
self.queue = SimpleQueue()
self.answer = None
class ThreadedApplication(object):
"""The client node application."""
def __init__(self, master_nodes, name, dynamic_master_list=None):
# Start polling thread
self.em = EventManager()
self.poll_thread = ThreadedPoll(self.em, name=name)
psThreadedPoll()
# Internal Attributes common to all thread
self.name = name
self.dispatcher = Dispatcher(self.poll_thread)
self.nm = NodeManager(dynamic_master_list)
self.master_conn = None
# load master node list
for address in master_nodes:
self.nm.createMaster(address=address)
# no self-assigned UUID, primary master will supply us one
self.uuid = None
# Internal attribute distinct between thread
self._thread_container = ThreadContainer()
app_set.add(self) # to register self.on_log
def __del__(self):
# Due to bug in ZODB, close is not always called when shutting
# down zope, so use __del__ to close connections
self.close()
def close(self):
# Clear all connection
self.master_conn = None
for conn in self.em.getConnectionList():
conn.close()
# Stop polling thread
logging.debug('Stopping %s', self.poll_thread)
self.poll_thread.stop()
psThreadedPoll()
def getHandlerData(self):
return self._thread_container.answer
def setHandlerData(self, data):
self._thread_container.answer = data
def log(self):
self.em.log()
self.nm.log()
pt = self.__dict__.get('pt')
if pt is not None:
pt.log()
def _handlePacket(self, conn, packet, kw={}, handler=None):
"""
conn
The connection which received the packet (forwarded to handler).
packet
The packet to handle.
handler
The handler to use to handle packet.
If not given, it will be guessed from connection's not type.
"""
if handler is None:
# Guess the handler to use based on the type of node on the
# connection
node = self.nm.getByAddress(conn.getAddress())
if node is None:
raise ValueError, 'Expecting an answer from a node ' \
'which type is not known... Is this right ?'
if node.isStorage():
handler = self.storage_handler
elif node.isMaster():
handler = self.primary_handler
else:
raise ValueError, 'Unknown node type: %r' % (node.__class__, )
with conn.lock:
handler.dispatch(conn, packet, kw)
def _ask(self, conn, packet, handler=None, **kw):
self.setHandlerData(None)
queue = self._thread_container.queue
msg_id = conn.ask(packet, queue=queue, **kw)
get = queue.get
_handlePacket = self._handlePacket
while True:
qconn, qpacket, kw = get(True)
is_forgotten = isinstance(qpacket, ForgottenPacket)
if conn is qconn:
# check fake packet
if qpacket is None:
raise ConnectionClosed
if msg_id == qpacket.getId():
if is_forgotten:
raise ValueError, 'ForgottenPacket for an ' \
'explicitely expected packet.'
_handlePacket(qconn, qpacket, kw, handler)
break
if not is_forgotten and qpacket is not None:
_handlePacket(qconn, qpacket, kw)
return self.getHandlerData()
......@@ -15,8 +15,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from threading import Thread, enumerate as thread_enum
from neo.lib import logging
from neo.lib.locking import Lock
from . import logging
from .locking import Lock
class _ThreadedPoll(Thread):
"""Polling thread."""
......
......@@ -29,12 +29,12 @@ import transaction, ZODB
import neo.admin.app, neo.master.app, neo.storage.app
import neo.client.app, neo.neoctl.app
from neo.client import Storage
from neo.client.container import SimpleQueue
from neo.client.poll import _ThreadedPoll
from neo.lib.threaded_poll import _ThreadedPoll
from neo.lib import logging
from neo.lib.connection import BaseConnection, Connection
from neo.lib.connector import SocketConnector, \
ConnectorConnectionRefusedException
from neo.lib.locking import SimpleQueue
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, NodeTypes
from neo.lib.util import parseMasterList, p64
from .. import NeoTestBase, Patch, getTempDirectory, setupMySQLdb, \
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment