Commit 3bd82ef1 authored by Julien Muchembled's avatar Julien Muchembled

Remove unused ForgottenPacket mechanism (MTConnection)

It was only used by the now removed HasLock.
parent bf03a305
...@@ -36,7 +36,6 @@ from neo.lib.connection import MTClientConnection, ConnectionClosed ...@@ -36,7 +36,6 @@ from neo.lib.connection import MTClientConnection, ConnectionClosed
from .exception import NEOStorageError, NEOStorageCreationUndoneError from .exception import NEOStorageError, NEOStorageCreationUndoneError
from .exception import NEOStorageNotFoundError from .exception import NEOStorageNotFoundError
from .handlers import storage, master from .handlers import storage, master
from neo.lib.dispatcher import ForgottenPacket
from neo.lib.threaded_app import ThreadedApplication from neo.lib.threaded_app import ThreadedApplication
from .cache import ClientCache from .cache import ClientCache
from .pool import ConnectionPool from .pool import ConnectionPool
...@@ -173,8 +172,8 @@ class Application(ThreadedApplication): ...@@ -173,8 +172,8 @@ class Application(ThreadedApplication):
conn, packet, kw = get(block) conn, packet, kw = get(block)
except Empty: except Empty:
break break
if packet is None or isinstance(packet, ForgottenPacket): if packet is None:
# connection was closed or some packet was forgotten # connection was closed
continue continue
block = False block = False
try: try:
......
...@@ -19,18 +19,6 @@ from .locking import Lock, Empty ...@@ -19,18 +19,6 @@ from .locking import Lock, Empty
EMPTY = {} EMPTY = {}
NOBODY = [] NOBODY = []
class ForgottenPacket(object):
"""
Instances of this class will be pushed to queue when an expected answer
is being forgotten. Its purpose is similar to pushing "None" when
connection is closed, but the meaning is different.
"""
def __init__(self, msg_id):
self.msg_id = msg_id
def getId(self):
return self.msg_id
def giant_lock(func): def giant_lock(func):
def wrapped(self, *args, **kw): def wrapped(self, *args, **kw):
self.lock_acquire() self.lock_acquire()
...@@ -88,7 +76,7 @@ class Dispatcher: ...@@ -88,7 +76,7 @@ class Dispatcher:
def unregister(self, conn): def unregister(self, conn):
""" Unregister a connection and put fake packet in queues to unlock """ Unregister a connection and put fake packet in queues to unlock
threads excepting responses from that connection """ threads expecting responses from that connection """
self.lock_acquire() self.lock_acquire()
try: try:
message_table = self.message_table.pop(id(conn), EMPTY) message_table = self.message_table.pop(id(conn), EMPTY)
...@@ -105,21 +93,6 @@ class Dispatcher: ...@@ -105,21 +93,6 @@ class Dispatcher:
notified_set.add(queue_id) notified_set.add(queue_id)
_decrefQueue(queue) _decrefQueue(queue)
@giant_lock
def forget(self, conn, msg_id):
""" Forget about a specific message for a specific connection.
Actually makes it "expected by nobody", so we know we can ignore it,
and not detect it as an error. """
message_table = self.message_table[id(conn)]
queue = message_table[msg_id]
if queue is NOBODY:
raise KeyError, 'Already expected by NOBODY: %r, %r' % (
conn, msg_id)
queue.put((conn, ForgottenPacket(msg_id), None))
self.queue_dict[id(queue)] -= 1
message_table[msg_id] = NOBODY
return queue
@giant_lock @giant_lock
def forget_queue(self, queue, flush_queue=True): def forget_queue(self, queue, flush_queue=True):
""" """
...@@ -137,9 +110,7 @@ class Dispatcher: ...@@ -137,9 +110,7 @@ class Dispatcher:
found += 1 found += 1
message_table[msg_id] = NOBODY message_table[msg_id] = NOBODY
refcount = self.queue_dict.pop(id(queue), 0) refcount = self.queue_dict.pop(id(queue), 0)
if refcount != found: assert refcount == found, (refcount, found)
raise ValueError('We hit a refcount bug: %s queue uses ' \
'expected, %s found' % (refcount, found))
if flush_queue: if flush_queue:
get = queue.get get = queue.get
while True: while True:
......
...@@ -19,7 +19,7 @@ from . import logging ...@@ -19,7 +19,7 @@ from . import logging
from .app import BaseApplication from .app import BaseApplication
from .connection import ConnectionClosed from .connection import ConnectionClosed
from .debug import register as registerLiveDebugger from .debug import register as registerLiveDebugger
from .dispatcher import Dispatcher, ForgottenPacket from .dispatcher import Dispatcher
from .locking import SimpleQueue from .locking import SimpleQueue
class app_set(weakref.WeakSet): class app_set(weakref.WeakSet):
...@@ -141,17 +141,14 @@ class ThreadedApplication(BaseApplication): ...@@ -141,17 +141,14 @@ class ThreadedApplication(BaseApplication):
_handlePacket = self._handlePacket _handlePacket = self._handlePacket
while True: while True:
qconn, qpacket, kw = get(True) qconn, qpacket, kw = get(True)
is_forgotten = isinstance(qpacket, ForgottenPacket)
if conn is qconn: if conn is qconn:
# check fake packet # check fake packet
if qpacket is None: if qpacket is None:
raise ConnectionClosed raise ConnectionClosed
if msg_id == qpacket.getId(): if msg_id == qpacket.getId():
if is_forgotten:
raise ValueError, 'ForgottenPacket for an ' \
'explicitly expected packet.'
_handlePacket(qconn, qpacket, kw, handler) _handlePacket(qconn, qpacket, kw, handler)
break break
if not is_forgotten and qpacket is not None: elif qpacket is None:
_handlePacket(qconn, qpacket, kw) continue
_handlePacket(qconn, qpacket, kw)
return self.getHandlerData() return self.getHandlerData()
...@@ -46,7 +46,6 @@ UNIT_TEST_MODULES = [ ...@@ -46,7 +46,6 @@ UNIT_TEST_MODULES = [
'neo.tests.testConnection', 'neo.tests.testConnection',
'neo.tests.testHandler', 'neo.tests.testHandler',
'neo.tests.testNodes', 'neo.tests.testNodes',
'neo.tests.testDispatcher',
'neo.tests.testUtil', 'neo.tests.testUtil',
'neo.tests.testPT', 'neo.tests.testPT',
# master application # master application
......
#
# Copyright (C) 2009-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from . import NeoTestBase
from neo.lib.dispatcher import Dispatcher, ForgottenPacket
from Queue import Queue
import unittest
class DispatcherTests(NeoTestBase):
def setUp(self):
NeoTestBase.setUp(self)
self.dispatcher = Dispatcher()
def testForget(self):
conn = object()
queue = Queue()
MARKER = object()
# Register an expectation
self.dispatcher.register(conn, 1, queue)
# ...and forget about it, returning registered queue
forgotten_queue = self.dispatcher.forget(conn, 1)
self.assertTrue(queue is forgotten_queue, (queue, forgotten_queue))
# A ForgottenPacket must have been put in the queue
queue_conn, packet, kw = queue.get(block=False)
self.assertTrue(isinstance(packet, ForgottenPacket), packet)
# ...with appropriate packet id
self.assertEqual(packet.getId(), 1)
# ...and appropriate connection
self.assertTrue(conn is queue_conn, (conn, queue_conn))
# If forgotten twice, it must raise a KeyError
self.assertRaises(KeyError, self.dispatcher.forget, conn, 1)
# Event arrives, return value must be True (it was expected)
self.assertTrue(self.dispatcher.dispatch(conn, 1, MARKER, {}))
# ...but must not have reached the queue
self.assertTrue(queue.empty())
# Register an expectation
self.dispatcher.register(conn, 1, queue)
# ...and forget about it
self.dispatcher.forget(conn, 1)
queue.get(block=False)
# No exception must happen if connection is lost.
self.dispatcher.unregister(conn)
# Forgotten message's queue must not have received a "None"
self.assertTrue(queue.empty())
if __name__ == '__main__':
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment