Commit c42baaef authored by Julien Muchembled's avatar Julien Muchembled

Bump protocol version

parents 3a93658b 092992db
......@@ -4,18 +4,6 @@ or promised features of NEO (marked with N).
All the listed bugs will be fixed with high priority.
(Z) Conflict resolution not fully implemented
---------------------------------------------
Even with a single storage node, so-called 'deadlock avoidance' may
happen to in order to resolve conflicts. In such cases, conflicts will not be
resolved even if your _p_resolveConflict() method would succeed, leading to a
normal ConflictError.
Although this should happen rarely enough not to affect performance, this can
be an issue if your application can't afford restarting the transaction,
e.g. because it interacted with external environment.
(N) Storage failure or update may lead to POSException or break undoLog()
-------------------------------------------------------------------------
......
......@@ -15,16 +15,13 @@
General
- Review XXX/TODO code tags (CODE)
- Coverage for functional tests (i.e. collect results from subprocesses)
- When all cells are OUT_OF_DATE in backup mode, the one with most data
could become UP_TO_DATE with appropriate backup_tid, so that the cluster
stays operational. (FEATURE)
- Finish renaming UUID into NID everywhere (CODE)
- Implements delayed connection acceptation.
- Delayed connection acceptation even when a storage node is not ready ?
Currently, any node that connects too early to another that is busy for
some reasons is immediately rejected with the 'not ready' error code. This
should be replaced by a queue in the listening node that keep a pool a
nodes that will be accepted late, when the conditions will be satisfied.
some reasons is immediately rejected with the 'not ready' error code.
This is mainly the case for :
- Client rejected before the cluster is operational
- Empty storages rejected during recovery process
......@@ -41,18 +38,11 @@
- Clarify handler methods to call when a connection is accepted from a
listening conenction and when remote node is identified
(cf. neo/lib/bootstrap.py).
- Choose how to handle a storage integrity verification when it comes back.
Do the replication process, the verification stage, with or without
unfinished transactions, cells have to set as outdated, if yes, should the
partition table changes be broadcasted ? (BANDWITH, SPEED)
- Make SIGINT on primary master change cluster in STOPPING state.
- Review PENDING/HIDDEN/SHUTDOWN states, don't use notifyNodeInformation()
to do a state-switch, use a exception-based mechanism ? (CODE)
- Review handler split (CODE)
The current handler split is the result of small incremental changes. A
global review is required to make them square.
- Review node notifications. Eg. A storage don't have to be notified of new
clients but only when one is lost.
- Review transactional isolation of various methods
Some methods might not implement proper transaction isolation when they
should. An example is object history (undoLog), which can see data
......@@ -63,14 +53,12 @@
partitions. Currently, reads succeed because feeding nodes don't delete
anything while the cluster is operational, for performance reasons:
deletion of dropped partitions must be reimplemented in a scalable way.
(HIGH AVAILABILITY)
The same thing happens for writes: storage nodes must discard
stores/checks of dropped partitions (in lockObject, that can be done by
raising ConflictError(None)). (HIGH AVAILABILITY)
Storage
- Use libmysqld instead of a stand-alone MySQL server.
- Notify master when storage becomes available for clients (LATENCY)
Currently, storage presence is broadcasted to client nodes too early, as
the storage node would refuse them until it has only up-to-date data (not
only up-to-date cells, but also a partition table and node states).
- In backup mode, 2 simultaneous replication should be possible so that:
- outdated cells does not block backup for too long time
- constantly modified partitions does not prevent outdated cells to
......@@ -78,9 +66,7 @@
Current behaviour is undefined and the above 2 scenarios may happen.
- Create a specialized PartitionTable that know the database and replicator
to remove duplicates and remove logic from handlers (CODE)
- Consider insert multiple objects at time in the database, with taking care
of maximum SQL request size allowed. (SPEED)
- Make listening address and port optionnal, and if they are not provided
- Make listening address and port optional, and if they are not provided
listen on all interfaces on any available port.
- Make replication speed configurable (HIGH AVAILABILITY)
In its current implementation, replication runs at lowest priority, to
......@@ -125,15 +111,13 @@
instead of parsing the whole partition table. (SPEED)
Client
- Race conditions on the partition table ?
(update by the poll thread vs. access by other threads)
- Merge Application into Storage (SPEED)
- Optimize cache.py by rewriting it either in C or Cython (LOAD LATENCY)
- Use generic bootstrap module (CODE)
- If too many storage nodes are dead, the client should check the partition
table hasn't changed by pinging the master and retry if necessary.
- Implement IStorageRestoreable (ZODB API) in order to preserve data
serials (i.e. undo information).
- Fix and reenable deadlock avoidance (SPEED). This is required for
neo.threaded.test.Test.testDeadlockAvoidance
Admin
- Make admin node able to monitor multiple clusters simultaneously
......@@ -141,6 +125,7 @@
- Add ctl command to list last transactions, like fstail for FileStorage.
Tests
- Split neo/tests/threaded/test.py
- Use another mock library: Python 3.3+ has unittest.mock, which is
available for earlier versions at https://pypi.python.org/pypi/mock
......
......@@ -89,16 +89,16 @@ class Storage(BaseStorage.BaseStorage,
"""
Note: never blocks in NEO.
"""
return self.app.tpc_begin(transaction, tid, status)
return self.app.tpc_begin(self, transaction, tid, status)
def tpc_vote(self, transaction):
return self.app.tpc_vote(transaction, self.tryToResolveConflict)
return self.app.tpc_vote(transaction)
def tpc_abort(self, transaction):
return self.app.tpc_abort(transaction)
def tpc_finish(self, transaction, f=None):
return self.app.tpc_finish(transaction, self.tryToResolveConflict, f)
return self.app.tpc_finish(transaction, f)
def store(self, oid, serial, data, version, transaction):
assert version == '', 'Versions are not supported'
......@@ -128,7 +128,7 @@ class Storage(BaseStorage.BaseStorage,
# undo
def undo(self, transaction_id, txn):
return self.app.undo(transaction_id, txn, self.tryToResolveConflict)
return self.app.undo(transaction_id, txn)
def undoLog(self, first=0, last=-20, filter=None):
return self.app.undoLog(first, last, filter)
......@@ -167,8 +167,7 @@ class Storage(BaseStorage.BaseStorage,
def importFrom(self, source, start=None, stop=None, preindex=None):
""" Allow import only a part of the source storage """
return self.app.importFrom(source, start, stop,
self.tryToResolveConflict, preindex)
return self.app.importFrom(self, source, start, stop, preindex)
def pack(self, t, referencesf, gc=False):
if gc:
......
This diff is collapsed.
......@@ -147,7 +147,7 @@ class PrimaryNotificationsHandler(MTEventHandler):
logging.critical(msg)
app.master_conn = None
for txn_context in app.txn_contexts():
txn_context['error'] = msg
txn_context.error = msg
try:
del app.pt
except AttributeError:
......@@ -182,9 +182,9 @@ class PrimaryNotificationsHandler(MTEventHandler):
if self.app.pt.filled():
self.app.pt.update(ptid, cell_list, self.app.nm)
def notifyNodeInformation(self, conn, node_list):
def notifyNodeInformation(self, conn, timestamp, node_list):
super(PrimaryNotificationsHandler, self).notifyNodeInformation(
conn, node_list)
conn, timestamp, node_list)
# XXX: 'update' automatically closes DOWN nodes. Do we really want
# to do the same thing for nodes in other non-running states ?
getByUUID = self.app.nm.getByUUID
......@@ -194,6 +194,13 @@ class PrimaryNotificationsHandler(MTEventHandler):
if node and node.isConnected():
node.getConnection().close()
def notifyDeadlock(self, conn, ttid, locking_tid):
for txn_context in self.app.txn_contexts():
if txn_context.ttid == ttid:
txn_context.conflict_dict[None] = locking_tid, None
txn_context.wakeup(conn)
break
class PrimaryAnswersHandler(AnswerBaseHandler):
""" Handle that process expected packets from the primary master """
......@@ -204,6 +211,10 @@ class PrimaryAnswersHandler(AnswerBaseHandler):
oid_list.reverse()
self.app.new_oid_list = oid_list
def incompleteTransaction(self, conn, message):
raise NEOStorageError("storage nodes for which vote failed can not be"
" disconnected without making the cluster non-operational")
def answerTransactionFinished(self, conn, _, tid):
self.app.setHandlerData(tid)
......
This diff is collapsed.
......@@ -28,7 +28,7 @@ from .exception import NEOPrimaryMasterLost, NEOStorageError
# failed in the past.
MAX_FAILURE_AGE = 600
# Cell list sort keys
# Cell list sort keys, only for read access
# We are connected to storage node hosting cell, high priority
CELL_CONNECTED = -1
# normal priority
......@@ -36,6 +36,7 @@ CELL_GOOD = 0
# Storage node hosting cell failed recently, low priority
CELL_FAILED = 1
class ConnectionPool(object):
"""This class manages a pool of connections to storage nodes."""
......@@ -86,12 +87,12 @@ class ConnectionPool(object):
def getConnForCell(self, cell):
return self.getConnForNode(cell.getNode())
def iterateForObject(self, object_id, readable=False):
def iterateForObject(self, object_id):
""" Iterate over nodes managing an object """
pt = self.app.pt
if type(object_id) is str:
object_id = pt.getPartition(object_id)
cell_list = pt.getCellList(object_id, readable)
cell_list = pt.getCellList(object_id, True)
if not cell_list:
raise NEOStorageError('no storage available')
getConnForNode = self.getConnForNode
......@@ -106,7 +107,7 @@ class ConnectionPool(object):
node = cell.getNode()
conn = getConnForNode(node)
if conn is not None:
yield node, conn
yield conn
# Re-check if node is running, as our knowledge of its
# state can have changed during connection attempt.
elif node.isRunning():
......
#
# Copyright (C) 2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ZODB.POSException import StorageTransactionError
from neo.lib.connection import ConnectionClosed
from neo.lib.locking import SimpleQueue
from neo.lib.protocol import Packets
from .exception import NEOStorageError
@apply
class _WakeupPacket(object):
handler_method_name = 'pong'
decode = tuple
getId = int
class Transaction(object):
cache_size = 0 # size of data in cache_dict
data_size = 0 # size of data in data_dict
error = None
locking_tid = None
voted = False
ttid = None # XXX: useless, except for testBackupReadOnlyAccess
def __init__(self, txn):
self.queue = SimpleQueue()
self.txn = txn
# data being stored
self.data_dict = {} # {oid: (value, [node_id])}
# data stored: this will go to the cache on tpc_finish
self.cache_dict = {} # {oid: value}
# conflicts to resolve
self.conflict_dict = {} # {oid: (base_serial, serial)}
# resolved conflicts
self.resolved_dict = {} # {oid: serial}
# Keys are node ids instead of Node objects because a node may
# disappear from the cluster. In any case, we always have to check
# if the id is still known by the NodeManager.
# status: 0 -> check only, 1 -> store, 2 -> failed
self.involved_nodes = {} # {node_id: status}
def wakeup(self, conn):
self.queue.put((conn, _WakeupPacket, {}))
def write(self, app, packet, object_id, store=1, **kw):
uuid_list = []
pt = app.pt
involved = self.involved_nodes
object_id = pt.getPartition(object_id)
for cell in pt.getCellList(object_id):
node = cell.getNode()
uuid = node.getUUID()
status = involved.get(uuid, -1)
if status < store:
involved[uuid] = store
elif status > 1:
continue
conn = app.cp.getConnForNode(node)
if conn is not None:
try:
if status < 0 and self.locking_tid and 'oid' in kw:
# A deadlock happened but this node is not aware of it.
# Tell it to write-lock with the same locking tid as
# for the other nodes. The condition on kw is because
# we don't need that for transaction metadata.
conn.ask(Packets.AskRebaseTransaction(
self.ttid, self.locking_tid), queue=self.queue)
conn.ask(packet, queue=self.queue, **kw)
uuid_list.append(uuid)
continue
except ConnectionClosed:
pass
involved[uuid] = 2
if uuid_list:
return uuid_list
raise NEOStorageError(
'no storage available for write to partition %s' % object_id)
def written(self, app, uuid, oid):
# When a node that is being disconnected by the master because it was
# not part of the transaction that caused a conflict, we may receive a
# positive answer (not to be confused with lockless stores) before the
# conflict. Because we have no way to identify such case, we must keep
# the data in self.data_dict until all nodes have answered so we remain
# able to resolve conflicts.
try:
data, uuid_list = self.data_dict[oid]
uuid_list.remove(uuid)
except KeyError:
# 1. store to S1 and S2
# 2. S2 reports a conflict
# 3. store to S1 and S2 # conflict resolution
# 4. S1 does not report a conflict (lockless)
# 5. S2 answers before S1 for the second store
return
except ValueError:
# The most common case for this exception is because nodeLost()
# tries all oids blindly. Other possible cases:
# - like above (KeyError), but with S2 answering last
# - answer to resolved conflict before the first answer from a
# node that was being disconnected by the master
return
if uuid_list:
return
del self.data_dict[oid]
if type(data) is str:
size = len(data)
self.data_size -= size
size += self.cache_size
if size < app._cache._max_size:
self.cache_size = size
else:
# Do not cache data past cache max size, as it
# would just flush it on tpc_finish. This also
# prevents memory errors for big transactions.
data = None
self.cache_dict[oid] = data
def nodeLost(self, app, uuid):
self.involved_nodes[uuid] = 2
for oid in list(self.data_dict):
self.written(app, uuid, oid)
class TransactionContainer(dict):
# IDEA: Drop this container and use the new set_data/data API on
# transactions (requires transaction >= 1.6).
def pop(self, txn):
return dict.pop(self, id(txn), None)
def get(self, txn):
try:
return self[id(txn)]
except KeyError:
raise StorageTransactionError("unknown transaction %r" % txn)
def new(self, txn):
key = id(txn)
if key in self:
raise StorageTransactionError("commit of transaction %r"
" already started" % txn)
context = self[key] = Transaction(txn)
return context
......@@ -38,6 +38,7 @@ class BootstrapManager(EventHandler):
self.num_replicas = None
self.num_partitions = None
self.current = None
app.nm.reset()
uuid = property(lambda self: self.app.uuid)
......
......@@ -19,17 +19,15 @@ from .locking import Lock, Empty
EMPTY = {}
NOBODY = []
class ForgottenPacket(object):
"""
Instances of this class will be pushed to queue when an expected answer
is being forgotten. Its purpose is similar to pushing "None" when
connection is closed, but the meaning is different.
"""
def __init__(self, msg_id):
self.msg_id = msg_id
def getId(self):
return self.msg_id
@apply
class _ConnectionClosed(object):
handler_method_name = 'connectionClosed'
decode = tuple
class getId(object):
def __eq__(self, other):
return True
def giant_lock(func):
def wrapped(self, *args, **kw):
......@@ -88,7 +86,7 @@ class Dispatcher:
def unregister(self, conn):
""" Unregister a connection and put fake packet in queues to unlock
threads excepting responses from that connection """
threads expecting responses from that connection """
self.lock_acquire()
try:
message_table = self.message_table.pop(id(conn), EMPTY)
......@@ -101,25 +99,10 @@ class Dispatcher:
continue
queue_id = id(queue)
if queue_id not in notified_set:
queue.put((conn, None, None))
queue.put((conn, _ConnectionClosed, EMPTY))
notified_set.add(queue_id)
_decrefQueue(queue)
@giant_lock
def forget(self, conn, msg_id):
""" Forget about a specific message for a specific connection.
Actually makes it "expected by nobody", so we know we can ignore it,
and not detect it as an error. """
message_table = self.message_table[id(conn)]
queue = message_table[msg_id]
if queue is NOBODY:
raise KeyError, 'Already expected by NOBODY: %r, %r' % (
conn, msg_id)
queue.put((conn, ForgottenPacket(msg_id), None))
self.queue_dict[id(queue)] -= 1
message_table[msg_id] = NOBODY
return queue
@giant_lock
def forget_queue(self, queue, flush_queue=True):
"""
......@@ -137,9 +120,7 @@ class Dispatcher:
found += 1
message_table[msg_id] = NOBODY
refcount = self.queue_dict.pop(id(queue), 0)
if refcount != found:
raise ValueError('We hit a refcount bug: %s queue uses ' \
'expected, %s found' % (refcount, found))
assert refcount == found, (refcount, found)
if flush_queue:
get = queue.get
while True:
......
......@@ -15,7 +15,9 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
from collections import deque
from . import logging
from .connection import ConnectionClosed
from .protocol import (
NodeStates, Packets, Errors, BackendNotImplemented,
BrokenNodeDisallowedError, NotReadyError, PacketMalformedError,
......@@ -23,6 +25,10 @@ from .protocol import (
from .util import cached_property
class DelayEvent(Exception):
pass
class EventHandler(object):
"""This class handles events."""
......@@ -64,6 +70,9 @@ class EventHandler(object):
raise UnexpectedPacketError('no handler found')
args = packet.decode() or ()
method(conn, *args, **kw)
except DelayEvent:
assert not kw, kw
self.getEventQueue().queueEvent(method, conn, args)
except UnexpectedPacketError, e:
if not conn.isClosed():
self.__unexpectedPacket(conn, packet, *e.args)
......@@ -165,9 +174,9 @@ class EventHandler(object):
return
conn.close()
def notifyNodeInformation(self, conn, node_list):
def notifyNodeInformation(self, conn, *args):
app = self.app
app.nm.update(app, node_list)
app.nm.update(app, *args)
def ping(self, conn):
conn.answer(Packets.Pong())
......@@ -207,9 +216,6 @@ class EventHandler(object):
def brokenNodeDisallowedError(self, conn, message):
raise RuntimeError, 'broken node disallowed error: %s' % (message,)
def alreadyPendingError(self, conn, message):
logging.error('already pending error: %s', message)
def ack(self, conn, message):
logging.debug("no error message: %s", message)
......@@ -264,3 +270,80 @@ class AnswerBaseHandler(EventHandler):
def acceptIdentification(*args):
pass
def connectionClosed(self, conn):
raise ConnectionClosed
class _DelayedConnectionEvent(EventHandler):
handler_method_name = '_func'
__new__ = object.__new__
def __init__(self, func, conn, args):
self._args = args
self._conn = conn
self._func = func
self._msg_id = conn.getPeerId()
def __call__(self):
conn = self._conn
if not conn.isClosed():
msg_id = conn.getPeerId()
try:
self.dispatch(conn, self)
finally:
conn.setPeerId(msg_id)
def __repr__(self):
return '<%s: 0x%x %s>' % (self._func.__name__, self._msg_id, self._conn)
def decode(self):
return self._args
def getEventQueue(self):
raise
def getId(self):
return self._msg_id
class EventQueue(object):
def __init__(self):
self._event_queue = deque()
self._executing_event = -1
def queueEvent(self, func, conn=None, args=()):
self._event_queue.append(func if conn is None else
_DelayedConnectionEvent(func, conn, args))
def executeQueuedEvents(self):
# Not reentrant. When processing a queued event, calling this method
# only tells the caller to retry all events from the beginning, because
# events for the same connection must be processed in chronological
# order.
self._executing_event += 1
if self._executing_event:
return
queue = self._event_queue
n = len(queue)
while n:
try:
queue[0]()
except DelayEvent:
queue.rotate(-1)
else:
del queue[0]
n -= 1
if self._executing_event:
self._executing_event = 0
queue.rotate(-n)
n = len(queue)
self._executing_event = -1
def logQueuedEvents(self):
if self._event_queue:
logging.info(" Pending events:")
for event in self._event_queue:
logging.info(' %r', event)
......@@ -19,8 +19,9 @@ from os.path import exists, getsize
import json
from . import attributeTracker, logging
from .handler import DelayEvent, EventQueue
from .protocol import formatNodeList, uuid_str, \
NodeTypes, NodeStates, ProtocolError
NodeTypes, NodeStates, NotReadyError, ProtocolError
class Node(object):
......@@ -232,7 +233,7 @@ class MasterDB(object):
def __iter__(self):
return iter(self._set)
class NodeManager(object):
class NodeManager(EventQueue):
"""This class manages node status."""
_master_db = None
......@@ -255,9 +256,14 @@ class NodeManager(object):
self._master_db = db = MasterDB(master_db)
for addr in db:
self.createMaster(address=addr)
self.reset()
close = __init__
def reset(self):
EventQueue.__init__(self)
self._timestamp = 0
def add(self, node):
if node in self._node_set:
logging.warning('adding a known node %r, ignoring', node)
......@@ -350,10 +356,23 @@ class NodeManager(object):
return self._address_dict.get(address, None)
def getByUUID(self, uuid, *id_timestamp):
""" Return the node that match with a given UUID """
"""Return the node that matches with a given UUID
If an id timestamp is passed, DelayEvent is raised if identification
must be delayed. This is because we rely only on the notifications from
the master to recognize nodes (otherwise, we could get id conflicts)
and such notifications may be late in some cases, even when the master
expects us to not reject the connection.
"""
node = self._uuid_dict.get(uuid)
if not id_timestamp or node and (node.id_timestamp,) == id_timestamp:
return node
if id_timestamp:
id_timestamp, = id_timestamp
if not node or node.id_timestamp != id_timestamp:
if self._timestamp < id_timestamp:
raise DelayEvent
# The peer got disconnected from the master.
raise NotReadyError('unknown by master')
return node
def _createNode(self, klass, address=None, uuid=None, **kw):
by_address = self.getByAddress(address)
......@@ -389,7 +408,9 @@ class NodeManager(object):
def createFromNodeType(self, node_type, **kw):
return self._createNode(NODE_TYPE_MAPPING[node_type], **kw)
def update(self, app, node_list):
def update(self, app, timestamp, node_list):
assert self._timestamp < timestamp, (self._timestamp, timestamp)
self._timestamp = timestamp
node_set = self._node_set.copy() if app.id_timestamp is None else None
for node_type, addr, uuid, state, id_timestamp in node_list:
# This should be done here (although klass might not be used in this
......@@ -443,12 +464,14 @@ class NodeManager(object):
for node in node_set - self._node_set:
self.remove(node)
self.log()
self.executeQueuedEvents()
def log(self):
logging.info('Node manager : %u nodes', len(self._node_set))
if self._node_set:
logging.info('\n'.join(formatNodeList(
map(Node.asTuple, self._node_set), ' * ')))
self.logQueuedEvents()
@apply
def NODE_TYPE_MAPPING():
......
......@@ -20,7 +20,7 @@ import traceback
from cStringIO import StringIO
from struct import Struct
PROTOCOL_VERSION = 9
PROTOCOL_VERSION = 10
# Size restrictions.
MIN_PACKET_SIZE = 10
......@@ -71,11 +71,11 @@ def ErrorCodes():
OID_DOES_NOT_EXIST
PROTOCOL_ERROR
BROKEN_NODE
ALREADY_PENDING
REPLICATION_ERROR
CHECKING_ERROR
BACKEND_NOT_IMPLEMENTED
READ_ONLY_ACCESS
INCOMPLETE_TRANSACTION
@Enum
def ClusterStates():
......@@ -146,12 +146,6 @@ def CellStates():
# readable nor writable.
CORRUPTED
@Enum
def LockState():
NOT_LOCKED
GRANTED
GRANTED_TO_OTHER
# used for logging
node_state_prefix_dict = {
NodeStates.RUNNING: 'R',
......@@ -404,6 +398,19 @@ class PStructItemOrNone(PStructItem):
value = reader(self.size)
return None if value == self._None else self.unpack(value)[0]
class POption(PStruct):
def _encode(self, writer, value):
if value is None:
writer('\0')
else:
writer('\1')
PStruct._encode(self, writer, value)
def _decode(self, reader):
if '\0\1'.index(reader(1)):
return PStruct._decode(self, reader)
class PList(PStructItem):
"""
A list of homogeneous items
......@@ -869,6 +876,18 @@ class BeginTransaction(Packet):
PTID('tid'),
)
class FailedVote(Packet):
"""
Report storage nodes for which vote failed. C -> M
True is returned if it's still possible to finish the transaction.
"""
_fmt = PStruct('failed_vote',
PTID('tid'),
PFUUIDList,
)
_answer = Error
class FinishTransaction(Packet):
"""
Finish a transaction. C -> PM.
......@@ -943,14 +962,60 @@ class GenerateOIDs(Packet):
PFOidList,
)
class Deadlock(Packet):
"""
Ask master to generate a new TTID that will be used by the client
to rebase a transaction. S -> PM -> C
"""
_fmt = PStruct('notify_deadlock',
PTID('ttid'),
PTID('locking_tid'),
)
class RebaseTransaction(Packet):
"""
Rebase transaction. C -> S.
"""
_fmt = PStruct('ask_rebase_transaction',
PTID('ttid'),
PTID('locking_tid'),
)
_answer = PStruct('answer_rebase_transaction',
PFOidList,
)
class RebaseObject(Packet):
"""
Rebase object. C -> S.
XXX: It is a request packet to simplify the implementation. For more
efficiency, this should be turned into a notification, and the
RebaseTransaction should answered once all objects are rebased
(so that the client can still wait on something).
"""
_fmt = PStruct('ask_rebase_object',
PTID('ttid'),
PTID('oid'),
)
_answer = PStruct('answer_rebase_object',
POption('conflict',
PTID('serial'),
PTID('conflict_serial'),
POption('data',
PBoolean('compression'),
PChecksum('checksum'),
PString('data'),
),
)
)
class StoreObject(Packet):
"""
Ask to store an object. Send an OID, an original serial, a current
transaction ID, and data. C -> S.
Answer if an object has been stored. If an object is in conflict,
a serial of the conflicting transaction is returned. In this case,
if this serial is newer than the current transaction ID, a client
node must not try to resolve the conflict. S -> C.
As for IStorage, 'serial' is ZERO_TID for new objects.
"""
_fmt = PStruct('ask_store_object',
POID('oid'),
......@@ -960,21 +1025,19 @@ class StoreObject(Packet):
PString('data'),
PTID('data_serial'),
PTID('tid'),
PBoolean('unlock'),
)
_answer = PStruct('answer_store_object',
PBoolean('conflicting'),
POID('oid'),
PTID('serial'),
PTID('conflict'),
)
class AbortTransaction(Packet):
"""
Abort a transaction. C -> S, PM.
Abort a transaction. C -> PM -> S.
"""
_fmt = PStruct('abort_transaction',
PTID('tid'),
PFUUIDList, # unused for PM -> S
)
class StoreTransaction(Packet):
......@@ -1158,6 +1221,7 @@ class NotifyNodeInformation(Packet):
Notify information about one or more nodes. PM -> Any.
"""
_fmt = PStruct('notify_node_informations',
PFloat('id_timestamp'),
PFNodeList,
)
......@@ -1243,22 +1307,6 @@ class ObjectUndoSerial(Packet):
),
)
class HasLock(Packet):
"""
Ask a storage is oid is locked by another transaction.
C -> S
Answer whether a transaction holds the write lock for requested object.
"""
_fmt = PStruct('has_load_lock',
PTID('tid'),
POID('oid'),
)
_answer = PStruct('answer_has_lock',
POID('oid'),
PEnum('lock_state', LockState),
)
class CheckCurrentSerial(Packet):
"""
Verifies if given serial is current for object oid in the database, and
......@@ -1270,16 +1318,12 @@ class CheckCurrentSerial(Packet):
"""
_fmt = PStruct('ask_check_current_serial',
PTID('tid'),
PTID('serial'),
POID('oid'),
)
_answer = PStruct('answer_store_object',
PBoolean('conflicting'),
POID('oid'),
PTID('serial'),
)
_answer = StoreObject._answer
class Pack(Packet):
"""
Request a pack at given TID.
......@@ -1661,6 +1705,8 @@ class Packets(dict):
ValidateTransaction)
AskBeginTransaction, AnswerBeginTransaction = register(
BeginTransaction)
FailedVote = register(
FailedVote)
AskFinishTransaction, AnswerTransactionFinished = register(
FinishTransaction, ignore_when_closed=False)
AskLockInformation, AnswerInformationLocked = register(
......@@ -1671,6 +1717,12 @@ class Packets(dict):
UnlockInformation)
AskNewOIDs, AnswerNewOIDs = register(
GenerateOIDs)
NotifyDeadlock = register(
Deadlock)
AskRebaseTransaction, AnswerRebaseTransaction = register(
RebaseTransaction)
AskRebaseObject, AnswerRebaseObject = register(
RebaseObject)
AskStoreObject, AnswerStoreObject = register(
StoreObject)
AbortTransaction = register(
......@@ -1709,8 +1761,6 @@ class Packets(dict):
ClusterState)
AskObjectUndoSerial, AnswerObjectUndoSerial = register(
ObjectUndoSerial)
AskHasLock, AnswerHasLock = register(
HasLock)
AskTIDsFrom, AnswerTIDsFrom = register(
TIDListFrom)
AskPack, AnswerPack = register(
......@@ -1780,3 +1830,8 @@ def formatNodeList(node_list, prefix='', _sort_key=itemgetter(2)):
for i in xrange(len(node_list[0]) - 1))
return map((prefix + t + '%s').__mod__, node_list)
return ()
NotifyNodeInformation._neolog = staticmethod(lambda timestamp, node_list:
((timestamp,), formatNodeList(node_list, ' ! ')))
Error._neolog = staticmethod(lambda *args: ((), ("%s (%s)" % args,)))
......@@ -258,15 +258,16 @@ class PartitionTable(object):
partition on the line (here, line length is 11 to keep the docstring
width under 80 column).
"""
node_list = sorted(self.count_dict)
result = ['pt: node %u: %s, %s' % (i, uuid_str(node.getUUID()),
protocol.node_state_prefix_dict[node.getState()])
for i, node in enumerate(sorted(self.count_dict))]
for i, node in enumerate(node_list)]
append = result.append
line = []
max_line_len = 20 # XXX: hardcoded number of partitions per line
prefix = 0
prefix_len = int(math.ceil(math.log10(self.np)))
for offset, row in enumerate(self.formatRows()):
for offset, row in enumerate(self._formatRows(node_list)):
if len(line) == max_line_len:
append('pt: %0*u: %s' % (prefix_len, prefix, '|'.join(line)))
line = []
......@@ -276,8 +277,7 @@ class PartitionTable(object):
append('pt: %0*u: %s' % (prefix_len, prefix, '|'.join(line)))
return result
def formatRows(self):
node_list = sorted(self.count_dict)
def _formatRows(self, node_list):
cell_state_dict = protocol.cell_state_prefix_dict
for row in self.partition_list:
if row is None:
......@@ -287,13 +287,15 @@ class PartitionTable(object):
for x in row}
yield ''.join(cell_dict.get(x, '.') for x in node_list)
def operational(self):
def operational(self, exclude_list=()):
if not self.filled():
return False
for row in self.partition_list:
for cell in row:
if cell.isReadable() and cell.getNode().isRunning():
break
if cell.isReadable():
node = cell.getNode()
if node.isRunning() and node.getUUID() not in exclude_list:
break
else:
return False
return True
......
......@@ -17,9 +17,8 @@
import thread, threading, weakref
from . import logging
from .app import BaseApplication
from .connection import ConnectionClosed
from .debug import register as registerLiveDebugger
from .dispatcher import Dispatcher, ForgottenPacket
from .dispatcher import Dispatcher
from .locking import SimpleQueue
class app_set(weakref.WeakSet):
......@@ -141,17 +140,8 @@ class ThreadedApplication(BaseApplication):
_handlePacket = self._handlePacket
while True:
qconn, qpacket, kw = get(True)
is_forgotten = isinstance(qpacket, ForgottenPacket)
if conn is qconn:
# check fake packet
if qpacket is None:
raise ConnectionClosed
if msg_id == qpacket.getId():
if is_forgotten:
raise ValueError, 'ForgottenPacket for an ' \
'explicitly expected packet.'
_handlePacket(qconn, qpacket, kw, handler)
break
if not is_forgotten and qpacket is not None:
_handlePacket(qconn, qpacket, kw)
if conn is qconn and msg_id == qpacket.getId():
_handlePacket(qconn, qpacket, kw, handler)
break
_handlePacket(qconn, qpacket, kw)
return self.getHandlerData()
......@@ -29,6 +29,16 @@ from neo.lib.exception import ElectionFailure, PrimaryFailure, StoppedOperation
class StateChangedException(Exception): pass
_previous_time = 0
def monotonic_time():
global _previous_time
now = time()
if _previous_time < now:
_previous_time = now
else:
_previous_time = now = _previous_time + 1e-3
return now
from .backup_app import BackupApplication
from .handlers import election, identification, secondary
from .handlers import administration, client, storage
......@@ -41,6 +51,7 @@ from .verification import VerificationManager
class Application(BaseApplication):
"""The master node application."""
packing = None
storage_readiness = 0
# Latest completely committed TID
last_transaction = ZERO_TID
backup_tid = None
......@@ -56,7 +67,7 @@ class Application(BaseApplication):
self.server = config.getBind()
self.autostart = config.getAutostart()
self.storage_readiness = set()
self.storage_ready_dict = {}
for master_address in config.getMasters():
self.nm.createMaster(address=master_address)
......@@ -240,11 +251,12 @@ class Application(BaseApplication):
continue
node_dict[NodeTypes.MASTER].append(node_info)
now = monotonic_time()
# send at most one non-empty notification packet per node
for node in self.nm.getIdentifiedList():
node_list = node_dict.get(node.getType())
if node_list and node.isRunning() and node is not exclude:
node.notify(Packets.NotifyNodeInformation(node_list))
node.notify(Packets.NotifyNodeInformation(now, node_list))
def broadcastPartitionChanges(self, cell_list):
"""Broadcast a Notify Partition Changes packet."""
......@@ -398,6 +410,7 @@ class Application(BaseApplication):
conn.close()
# Reconnect to primary master node.
self.nm.reset()
primary_handler = secondary.PrimaryHandler(self)
ClientConnection(self, primary_handler, self.primary_master_node)
......@@ -491,11 +504,12 @@ class Application(BaseApplication):
logging.info("asking remaining nodes to shutdown")
handler = EventHandler(self)
now = monotonic_time()
for node in self.nm.getConnectedList():
conn = node.getConnection()
if node.isStorage():
conn.setHandler(handler)
conn.notify(Packets.NotifyNodeInformation(((
conn.notify(Packets.NotifyNodeInformation(now, ((
node.getType(), node.getAddress(), node.getUUID(),
NodeStates.TEMPORARILY_DOWN, None),)))
conn.abort()
......@@ -561,11 +575,16 @@ class Application(BaseApplication):
self.last_transaction = tid
def setStorageNotReady(self, uuid):
self.storage_readiness.discard(uuid)
self.storage_ready_dict.pop(uuid, None)
def setStorageReady(self, uuid):
self.storage_readiness.add(uuid)
if uuid not in self.storage_ready_dict:
self.storage_readiness = self.storage_ready_dict[uuid] = \
self.storage_readiness + 1
def isStorageReady(self, uuid):
return uuid in self.storage_readiness
return uuid in self.storage_ready_dict
def getStorageReadySet(self, readiness=float('inf')):
return {k for k, v in self.storage_ready_dict.iteritems()
if v <= readiness}
......@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ..app import monotonic_time
from neo.lib import logging
from neo.lib.exception import StoppedOperation
from neo.lib.handler import EventHandler
......@@ -88,7 +89,7 @@ class MasterHandler(EventHandler):
node_list.extend(n.asTuple() for n in nm.getMasterList())
node_list.extend(n.asTuple() for n in nm.getClientList())
node_list.extend(n.asTuple() for n in nm.getStorageList())
conn.notify(Packets.NotifyNodeInformation(node_list))
conn.notify(Packets.NotifyNodeInformation(monotonic_time(), node_list))
def askPartitionTable(self, conn):
pt = self.app.pt
......
......@@ -17,7 +17,7 @@
import random
from . import MasterHandler
from ..app import StateChangedException
from ..app import monotonic_time, StateChangedException
from neo.lib import logging
from neo.lib.exception import StoppedOperation
from neo.lib.pt import PartitionTableException
......@@ -103,7 +103,8 @@ class AdministrationHandler(MasterHandler):
node.setState(state)
if node.isConnected():
# notify itself so it can shutdown
node.notify(Packets.NotifyNodeInformation([node.asTuple()]))
node.notify(Packets.NotifyNodeInformation(
monotonic_time(), [node.asTuple()]))
# close to avoid handle the closure as a connection lost
node.getConnection().abort()
if keep:
......@@ -121,7 +122,8 @@ class AdministrationHandler(MasterHandler):
# ignores non-running nodes
assert not node.isRunning()
if node.isConnected():
node.notify(Packets.NotifyNodeInformation([node.asTuple()]))
node.notify(Packets.NotifyNodeInformation(
monotonic_time(), [node.asTuple()]))
app.broadcastNodesInformation([node])
def addPendingNodes(self, conn, uuid_list):
......
......@@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib.protocol import NodeStates, Packets, ProtocolError, MAX_TID, Errors
from ..app import monotonic_time
from . import MasterHandler
class ClientServiceHandler(MasterHandler):
......@@ -36,7 +37,7 @@ class ClientServiceHandler(MasterHandler):
node_list = [nm.getByUUID(conn.getUUID()).asTuple()] # for id_timestamp
node_list.extend(n.asTuple() for n in nm.getMasterList())
node_list.extend(n.asTuple() for n in nm.getStorageList())
conn.notify(Packets.NotifyNodeInformation(node_list))
conn.notify(Packets.NotifyNodeInformation(monotonic_time(), node_list))
def askBeginTransaction(self, conn, tid):
"""
......@@ -44,46 +45,42 @@ class ClientServiceHandler(MasterHandler):
"""
app = self.app
node = app.nm.getByUUID(conn.getUUID())
conn.answer(Packets.AnswerBeginTransaction(app.tm.begin(node, tid)))
tid = app.tm.begin(node, app.storage_readiness, tid)
conn.answer(Packets.AnswerBeginTransaction(tid))
def askNewOIDs(self, conn, num_oids):
conn.answer(Packets.AnswerNewOIDs(self.app.tm.getNextOIDList(num_oids)))
def getEventQueue(self):
# for failedVote
return self.app.tm
def failedVote(self, conn, *args):
app = self.app
conn.answer((Errors.Ack if app.tm.vote(app, *args) else
Errors.IncompleteTransaction)())
def askFinishTransaction(self, conn, ttid, oid_list, checked_list):
app = self.app
pt = app.pt
# Collect partitions related to this transaction.
getPartition = pt.getPartition
partition_set = set(map(getPartition, oid_list))
partition_set.update(map(getPartition, checked_list))
partition_set.add(getPartition(ttid))
# Collect the UUIDs of nodes related to this transaction.
uuid_list = filter(app.isStorageReady, {cell.getUUID()
for part in partition_set
for cell in pt.getCellList(part)
if cell.getNodeState() != NodeStates.HIDDEN})
if not uuid_list:
raise ProtocolError('No storage node ready for transaction')
identified_node_list = app.nm.getIdentifiedList(pool_set=set(uuid_list))
# Request locking data.
# build a new set as we may not send the message to all nodes as some
# might be not reachable at that time
p = Packets.AskLockInformation(
tid, node_list = app.tm.prepare(
app,
ttid,
app.tm.prepare(
ttid,
pt.getPartitions(),
oid_list,
{x.getUUID() for x in identified_node_list},
conn.getPeerId(),
),
oid_list,
checked_list,
conn.getPeerId(),
)
for node in identified_node_list:
node.ask(p, timeout=60)
if tid:
p = Packets.AskLockInformation(ttid, tid)
for node in node_list:
node.ask(p, timeout=60)
else:
conn.answer(Errors.IncompleteTransaction())
# It's simpler to abort automatically rather than asking the client
# to send a notification on tpc_abort, since it would have keep the
# transaction longer in list of transactions.
# This should happen so rarely that we don't try to minimize the
# number of abort notifications by looking the modified partitions.
self.abortTransaction(conn, ttid, app.getStorageReadySet())
def askFinalTID(self, conn, ttid):
tm = self.app.tm
......@@ -112,9 +109,24 @@ class ClientServiceHandler(MasterHandler):
else:
conn.answer(Packets.AnswerPack(False))
def abortTransaction(self, conn, tid):
# BUG: The replicator may wait this transaction to be finished.
self.app.tm.abort(tid, conn.getUUID())
def abortTransaction(self, conn, tid, uuid_list):
# Consider a failure when the connection between the storage and the
# client breaks while the answer to the first write is sent back.
# In other words, the client can not know the exact set of nodes that
# know this transaction, and it sends us all nodes it considered for
# writing.
# We must also add those that are waiting for this transaction to be
# finished (returned by tm.abort), because they may have join the
# cluster after that the client started to abort.
app = self.app
involved = app.tm.abort(tid, conn.getUUID())
involved.update(uuid_list)
involved.intersection_update(app.getStorageReadySet())
if involved:
p = Packets.AbortTransaction(tid, ())
getByUUID = app.nm.getByUUID
for involved in involved:
getByUUID(involved).notify(p)
# like ClientServiceHandler but read-only & only for tid <= backup_tid
......
......@@ -56,7 +56,7 @@ class BaseElectionHandler(EventHandler):
class ClientElectionHandler(BaseElectionHandler):
def notifyNodeInformation(self, conn, node_list):
def notifyNodeInformation(self, conn, timestamp, node_list):
# XXX: For the moment, do nothing because
# we'll close this connection and reconnect.
pass
......
......@@ -14,10 +14,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from time import time
from neo.lib import logging
from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, \
NotReadyError, ProtocolError, uuid_str
from ..app import monotonic_time
from . import MasterHandler
class IdentificationHandler(MasterHandler):
......@@ -92,7 +92,7 @@ class IdentificationHandler(MasterHandler):
uuid=uuid, address=address)
else:
node.setUUID(uuid)
node.id_timestamp = time()
node.id_timestamp = monotonic_time()
node.setState(state)
node.setConnection(conn)
conn.setHandler(handler)
......
......@@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
from ..app import monotonic_time
from . import MasterHandler
from neo.lib.handler import EventHandler
from neo.lib.exception import ElectionFailure, PrimaryFailure
......@@ -38,7 +39,7 @@ class SecondaryMasterHandler(MasterHandler):
def _notifyNodeInformation(self, conn):
node_list = [n.asTuple() for n in self.app.nm.getMasterList()]
conn.notify(Packets.NotifyNodeInformation(node_list))
conn.notify(Packets.NotifyNodeInformation(monotonic_time(), node_list))
class PrimaryHandler(EventHandler):
""" Handler used by secondaries to handle primary master"""
......@@ -72,8 +73,9 @@ class PrimaryHandler(EventHandler):
def notifyClusterInformation(self, conn, state):
self.app.cluster_state = state
def notifyNodeInformation(self, conn, node_list):
super(PrimaryHandler, self).notifyNodeInformation(conn, node_list)
def notifyNodeInformation(self, conn, timestamp, node_list):
super(PrimaryHandler, self).notifyNodeInformation(
conn, timestamp, node_list)
for node_type, _, uuid, state, _ in node_list:
assert node_type == NodeTypes.MASTER, node_type
if uuid == self.app.uuid and state == NodeStates.UNKNOWN:
......
......@@ -26,18 +26,18 @@ class StorageServiceHandler(BaseServiceHandler):
def connectionCompleted(self, conn, new):
app = self.app
uuid = conn.getUUID()
app.setStorageNotReady(uuid)
if new:
super(StorageServiceHandler, self).connectionCompleted(conn, new)
if app.nm.getByUUID(uuid).isRunning(): # node may be PENDING
if app.nm.getByUUID(conn.getUUID()).isRunning(): # node may be PENDING
conn.notify(Packets.StartOperation(app.backup_tid))
def connectionLost(self, conn, new_state):
app = self.app
node = app.nm.getByUUID(conn.getUUID())
uuid = conn.getUUID()
node = app.nm.getByUUID(uuid)
super(StorageServiceHandler, self).connectionLost(conn, new_state)
app.tm.storageLost(conn.getUUID())
app.setStorageNotReady(uuid)
app.tm.storageLost(uuid)
if (app.getClusterState() == ClusterStates.BACKINGUP
# Also check if we're exiting, because backup_app is not usable
# in this case. Maybe cluster state should be set to something
......@@ -61,6 +61,9 @@ class StorageServiceHandler(BaseServiceHandler):
p = Packets.AnswerUnfinishedTransactions(last_tid, pending_list)
conn.answer(p)
def notifyDeadlock(self, conn, *args):
self.app.tm.deadlock(conn.getUUID(), *args)
def answerInformationLocked(self, conn, ttid):
self.app.tm.lock(ttid, conn.getUUID())
......
......@@ -16,6 +16,7 @@
from neo.lib import logging
from neo.lib.protocol import Packets, ProtocolError, ClusterStates, NodeStates
from .app import monotonic_time
from .handlers import MasterHandler
......@@ -170,8 +171,9 @@ class RecoveryManager(MasterHandler):
new_nodes = app.pt.load(ptid, row_list, app.nm)
except IndexError:
raise ProtocolError('Invalid offset')
self._notifyAdmins(Packets.NotifyNodeInformation(new_nodes),
Packets.SendPartitionTable(ptid, row_list))
self._notifyAdmins(
Packets.NotifyNodeInformation(monotonic_time(), new_nodes),
Packets.SendPartitionTable(ptid, row_list))
self.ask_pt = ()
uuid = conn.getUUID()
app.backup_tid = self.backup_tid_dict[uuid]
......
......@@ -18,29 +18,31 @@ from collections import deque
from time import time
from struct import pack, unpack
from neo.lib import logging
from neo.lib.protocol import ProtocolError, uuid_str, ZERO_OID, ZERO_TID
from neo.lib.handler import DelayEvent, EventQueue
from neo.lib.protocol import Packets, ProtocolError, uuid_str, \
ZERO_OID, ZERO_TID
from neo.lib.util import dump, u64, addTID, tidFromTime
class DelayedError(Exception):
pass
class Transaction(object):
"""
A pending transaction
"""
locking_tid = ZERO_TID
_tid = None
_msg_id = None
_oid_list = None
_failed = frozenset()
_prepared = False
# uuid dict hold flag to known who has locked the transaction
_uuid_set = None
_lock_wait_uuid_set = None
def __init__(self, node, ttid):
def __init__(self, node, storage_readiness, ttid):
"""
Prepare the transaction, set OIDs and UUIDs related to it
"""
self._node = node
self._storage_readiness = storage_readiness
self._ttid = ttid
self._birth = time()
# store storage uuids that must be notified at commit
......@@ -113,13 +115,13 @@ class Transaction(object):
"""
return list(self._notification_set)
def prepare(self, tid, oid_list, uuid_list, msg_id):
def prepare(self, tid, oid_list, uuid_set, msg_id):
self._tid = tid
self._oid_list = oid_list
self._msg_id = msg_id
self._uuid_set = set(uuid_list)
self._lock_wait_uuid_set = set(uuid_list)
self._uuid_set = uuid_set
self._lock_wait_uuid_set = uuid_set.copy()
self._prepared = True
def storageLost(self, uuid):
......@@ -163,7 +165,7 @@ class Transaction(object):
return not self._lock_wait_uuid_set
class TransactionManager(object):
class TransactionManager(EventQueue):
"""
Manage current transactions
"""
......@@ -173,6 +175,7 @@ class TransactionManager(object):
self.reset()
def reset(self):
EventQueue.__init__(self)
# ttid -> transaction
self._ttid_dict = {}
self._last_oid = ZERO_OID
......@@ -195,6 +198,7 @@ class TransactionManager(object):
except ValueError:
pass
del self._ttid_dict[ttid]
self.executeQueuedEvents()
def __contains__(self, ttid):
"""
......@@ -285,7 +289,7 @@ class TransactionManager(object):
txn.registerForNotification(uuid)
return self._ttid_dict.keys()
def begin(self, node, tid=None):
def begin(self, node, storage_readiness, tid=None):
"""
Generate a new TID
"""
......@@ -297,38 +301,116 @@ class TransactionManager(object):
# last TID.
self._queue.append(tid)
self.setLastTID(tid)
txn = self._ttid_dict[tid] = Transaction(node, tid)
txn = self._ttid_dict[tid] = Transaction(node, storage_readiness, tid)
logging.debug('Begin %s', txn)
return tid
def prepare(self, ttid, divisor, oid_list, uuid_list, msg_id):
def deadlock(self, storage_id, ttid, locking_tid):
try:
txn = self._ttid_dict[ttid]
except KeyError:
return
if txn.locking_tid <= locking_tid:
client = txn.getNode()
txn.locking_tid = locking_tid = self._nextTID()
logging.info('Deadlock avoidance triggered by %s for %s:'
' new locking tid for TXN %s is %s', uuid_str(storage_id),
uuid_str(client.getUUID()), dump(ttid), dump(locking_tid))
client.notify(Packets.NotifyDeadlock(ttid, locking_tid))
def vote(self, app, ttid, uuid_list):
"""
Check that the transaction can be voted
when the client reports failed nodes.
"""
txn = self[ttid]
# The client does not know which nodes are not expected to have
# transactions in full. Let's filter out them.
failed = app.getStorageReadySet(txn._storage_readiness)
failed.intersection_update(uuid_list)
if failed:
operational = app.pt.operational
if not operational(failed):
# No way to commit this transaction because there are
# non-replicated storage nodes with failed stores.
return False
failed = failed.copy()
for t in self._ttid_dict.itervalues():
failed |= t._failed
if not operational(failed):
# Other transactions were voted and unless they're aborted,
# we won't be able to finish this one, because that would make
# the cluster non-operational. Let's tell the caller to retry
# later.
raise DelayEvent
# Allow the client to finish the transaction,
# even if it will disconnect storage nodes.
txn._failed = failed
return True
def prepare(self, app, ttid, oid_list, checked_list, msg_id):
"""
Prepare a transaction to be finished
"""
txn = self[ttid]
pt = app.pt
failed = txn._failed
if failed and not pt.operational(failed):
return None, None
ready = app.getStorageReadySet(txn._storage_readiness)
getPartition = pt.getPartition
partition_set = set(map(getPartition, oid_list))
partition_set.update(map(getPartition, checked_list))
partition_set.add(getPartition(ttid))
node_list = []
uuid_set = set()
for partition in partition_set:
for cell in pt.getCellList(partition):
node = cell.getNode()
if node.isIdentified():
uuid = node.getUUID()
if uuid in uuid_set:
continue
if uuid in failed:
# This will commit a new PT with outdated cells before
# locking the transaction, which is important during
# the verification phase.
node.getConnection().close()
elif uuid in ready:
uuid_set.add(uuid)
node_list.append(node)
# A node that was not ready at the beginning of the transaction
# can't have readable cells. And if we're still operational without
# the 'failed' nodes, then there must still be 1 node in 'ready'
# that is UP.
assert node_list, (ready, failed)
# maybe not the fastest but _queue should be often small
if ttid in self._queue:
tid = ttid
else:
tid = self._nextTID(ttid, divisor)
tid = self._nextTID(ttid, pt.getPartitions())
self._queue.append(ttid)
logging.debug('Finish TXN %s for %s (was %s)',
dump(tid), txn.getNode(), dump(ttid))
txn.prepare(tid, oid_list, uuid_list, msg_id)
txn.prepare(tid, oid_list, uuid_set, msg_id)
# check if greater and foreign OID was stored
if oid_list:
self.setLastOID(max(oid_list))
return tid
return tid, node_list
def abort(self, ttid, uuid):
"""
Abort a transaction
"""
logging.debug('Abort TXN %s for %s', dump(ttid), uuid_str(uuid))
if self[ttid].isPrepared():
txn = self[ttid]
if txn.isPrepared():
raise ProtocolError("commit already requested for ttid %s"
% dump(ttid))
del self[ttid]
return txn._notification_set
def lock(self, ttid, uuid):
"""
......@@ -350,7 +432,7 @@ class TransactionManager(object):
for ttid, txn in self._ttid_dict.iteritems():
if txn.storageLost(uuid) and self._queue[0] == ttid:
unlock = True
# do not break: we must call forget() on all transactions
# do not break: we must call storageLost() on all transactions
if unlock:
self._unlockPending()
......@@ -370,6 +452,7 @@ class TransactionManager(object):
break
del queue[0], self._ttid_dict[ttid]
self._on_commit(txn)
self.executeQueuedEvents()
def clientLost(self, node):
for txn in self._ttid_dict.values():
......@@ -380,4 +463,4 @@ class TransactionManager(object):
logging.info('Transactions:')
for txn in self._ttid_dict.itervalues():
logging.info(' %r', txn)
self.logQueuedEvents()
......@@ -20,7 +20,6 @@
import bz2, gzip, errno, optparse, os, signal, sqlite3, sys, time
from bisect import insort
from logging import getLevelName
from functools import partial
comp_dict = dict(bz2=bz2.BZ2File, gz=gzip.GzipFile)
......@@ -94,11 +93,6 @@ class Log(object):
exec bz2.decompress(text) in g
for x in 'uuid_str', 'Packets', 'PacketMalformedError':
setattr(self, x, g[x])
try:
self.notifyNodeInformation = partial(g['formatNodeList'],
prefix=' ! ')
except KeyError:
self.notifyNodeInformation = None
try:
self._next_protocol, = q("SELECT date FROM protocol WHERE date>?",
(date,)).next()
......@@ -131,8 +125,8 @@ class Log(object):
body = None
msg = ['#0x%04x %-30s %s' % (msg_id, msg, peer)]
if body is not None:
logger = getattr(self, p.handler_method_name, None)
if logger or self._decode_all:
log = getattr(p, '_neolog', None)
if log or self._decode_all:
p = p()
p._id = msg_id
p._body = body
......@@ -141,15 +135,13 @@ class Log(object):
except self.PacketMalformedError:
msg.append("Can't decode packet")
else:
if logger:
msg += logger(*args)
elif args:
msg = '%s \t| %r' % (msg[0], args),
if log:
args, extra = log(*args)
msg += extra
if args and self._decode_all:
msg[0] += ' \t| ' + repr(args)
return date, name, 'PACKET', msg
def error(self, code, message):
return "%s (%s)" % (code, message),
def emit_many(log_list):
log_list = [(log, iter(log).next) for log in log_list]
......
......@@ -46,7 +46,6 @@ UNIT_TEST_MODULES = [
'neo.tests.testConnection',
'neo.tests.testHandler',
'neo.tests.testNodes',
'neo.tests.testDispatcher',
'neo.tests.testUtil',
'neo.tests.testPT',
# master application
......
......@@ -28,7 +28,6 @@ from neo.lib.util import dump
from neo.lib.bootstrap import BootstrapManager
from .checker import Checker
from .database import buildDatabaseManager
from .exception import AlreadyPendingError
from .handlers import identification, initialization
from .handlers import master, hidden
from .replicator import Replicator
......@@ -39,13 +38,14 @@ from neo.lib.debug import register as registerLiveDebugger
class Application(BaseApplication):
"""The storage node application."""
tm = None
def __init__(self, config):
super(Application, self).__init__(
config.getSSL(), config.getDynamicMasterList())
# set the cluster name
self.name = config.getCluster()
self.tm = TransactionManager(self)
self.dm = buildDatabaseManager(config.getAdapter(),
(config.getDatabase(), config.getEngine(), config.getWait()),
)
......@@ -69,8 +69,6 @@ class Application(BaseApplication):
self.master_node = None
# operation related data
self.event_queue = None
self.event_queue_dict = None
self.operational = False
# ready is True when operational and got all informations
......@@ -95,9 +93,9 @@ class Application(BaseApplication):
def log(self):
self.em.log()
self.logQueuedEvents()
self.nm.log()
self.tm.log()
if self.tm:
self.tm.log()
if self.pt is not None:
self.pt.log()
......@@ -188,9 +186,7 @@ class Application(BaseApplication):
for conn in self.em.getConnectionList():
if conn not in (self.listening_conn, self.master_conn):
conn.close()
# create/clear event queue
self.event_queue = deque()
self.event_queue_dict = {}
self.tm = TransactionManager(self)
try:
self.initialize()
self.doOperation()
......@@ -201,6 +197,7 @@ class Application(BaseApplication):
logging.error('primary master is down: %s', msg)
finally:
self.checker = Checker(self)
del self.tm
def connectToPrimary(self):
"""Find a primary master node, and connect to it.
......@@ -247,8 +244,8 @@ class Application(BaseApplication):
while not self.operational:
_poll()
self.ready = True
self.replicator.populate()
self.master_conn.notify(Packets.NotifyReady())
self.replicator.populate()
def doOperation(self):
"""Handle everything, including replications and transactions."""
......@@ -263,7 +260,6 @@ class Application(BaseApplication):
# Forget all unfinished data.
self.dm.dropUnfinishedData()
self.tm.reset()
self.task_queue = task_queue = deque()
try:
......@@ -308,46 +304,6 @@ class Application(BaseApplication):
if not node.isHidden():
break
def queueEvent(self, some_callable, conn=None, args=(), key=None,
raise_on_duplicate=True):
event_queue_dict = self.event_queue_dict
n = event_queue_dict.get(key)
if n and raise_on_duplicate:
raise AlreadyPendingError()
msg_id = None if conn is None else conn.getPeerId()
self.event_queue.append((key, some_callable, msg_id, conn, args))
if key is not None:
event_queue_dict[key] = n + 1 if n else 1
def executeQueuedEvents(self):
p = self.event_queue.popleft
event_queue_dict = self.event_queue_dict
for _ in xrange(len(self.event_queue)):
key, some_callable, msg_id, conn, args = p()
if key is not None:
n = event_queue_dict[key] - 1
if n:
event_queue_dict[key] = n
else:
del event_queue_dict[key]
if conn is None:
some_callable(*args)
elif not conn.isClosed():
orig_msg_id = conn.getPeerId()
try:
conn.setPeerId(msg_id)
some_callable(conn, *args)
finally:
conn.setPeerId(orig_msg_id)
def logQueuedEvents(self):
if self.event_queue is None:
return
logging.info("Pending events:")
for key, event, _msg_id, _conn, args in self.event_queue:
logging.info(' %r:%r: %r:%r %r %r', key, event.__name__,
_msg_id, _conn, args)
def newTask(self, iterator):
try:
iterator.next()
......
......@@ -109,7 +109,7 @@ class Checker(object):
self.source = source
def start():
if app.tm.isLockedTid(max_tid):
app.queueEvent(start)
app.tm.queueEvent(start)
return
args = partition, CHECK_COUNT, min_tid, max_tid
p = Packets.AskCheckTIDRange(*args)
......
......@@ -304,7 +304,7 @@ class ImporterDatabaseManager(DatabaseManager):
getPartitionTable changePartitionTable
getUnfinishedTIDDict dropUnfinishedData abortTransaction
storeTransaction lockTransaction unlockTransaction
storeData getOrphanList _pruneData deferCommit
loadData storeData getOrphanList _pruneData deferCommit
""".split():
setattr(self, x, getattr(self.db, x))
......
......@@ -463,6 +463,11 @@ class DatabaseManager(object):
no hash collision.
"""
@abstract
def loadData(self, data_id):
"""Inverse of storeData
"""
def holdData(self, checksum_or_id, *args):
"""Store raw data of temporary object
......
......@@ -541,6 +541,15 @@ class MySQLDatabaseManager(DatabaseManager):
raise
return self.conn.insert_id()
def loadData(self, data_id):
compression, hash, value = self.query(
"SELECT compression, hash, value FROM data where id=%s"
% data_id)[0]
if compression and compression & 0x80:
compression &= 0x7f
data = ''.join(self._bigData(data))
return compression, hash, value
del _structLL
def _getDataTID(self, oid, tid=None, before_tid=None):
......
......@@ -404,6 +404,10 @@ class SQLiteDatabaseManager(DatabaseManager):
return r
raise
def loadData(self, data_id):
return self.query("SELECT compression, hash, value"
" FROM data where id=?", (data_id,)).fetchone()
def _getDataTID(self, oid, tid=None, before_tid=None):
partition = self._getPartition(oid)
sql = 'SELECT tid, value_tid FROM obj' \
......
#
# Copyright (C) 2010-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
class AlreadyPendingError(Exception):
pass
......@@ -36,10 +36,11 @@ class BaseMasterHandler(EventHandler):
def notifyClusterInformation(self, conn, state):
self.app.changeClusterState(state)
def notifyNodeInformation(self, conn, node_list):
def notifyNodeInformation(self, conn, timestamp, node_list):
"""Store information on nodes, only if this is sent by a primary
master node."""
super(BaseMasterHandler, self).notifyNodeInformation(conn, node_list)
super(BaseMasterHandler, self).notifyNodeInformation(
conn, timestamp, node_list)
for node_type, _, uuid, state, _ in node_list:
if uuid == self.app.uuid:
# This is me, do what the master tell me
......
......@@ -15,12 +15,11 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging
from neo.lib.handler import EventHandler
from neo.lib.handler import DelayEvent, EventHandler
from neo.lib.util import dump, makeChecksum, add64
from neo.lib.protocol import Packets, LockState, Errors, ProtocolError, \
from neo.lib.protocol import Packets, Errors, ProtocolError, \
ZERO_HASH, INVALID_PARTITION
from ..transactions import ConflictError, DelayedError, NotRegisteredError
from ..exception import AlreadyPendingError
from ..transactions import ConflictError, NotRegisteredError
import time
# Log stores taking (incl. lock delays) more than this many seconds.
......@@ -38,12 +37,14 @@ class ClientOperationHandler(EventHandler):
t[4], t[0])
conn.answer(p)
def getEventQueue(self):
# for read rpc
return self.app.tm
def askObject(self, conn, oid, serial, tid):
app = self.app
if app.tm.loadLocked(oid):
# Delay the response.
app.queueEvent(self.askObject, conn, (oid, serial, tid))
return
raise DelayEvent
o = app.dm.getObject(oid, serial, tid)
try:
serial, next_serial, compression, checksum, data, data_serial = o
......@@ -58,9 +59,6 @@ class ClientOperationHandler(EventHandler):
compression, checksum, data, data_serial)
conn.answer(p)
def abortTransaction(self, conn, ttid):
self.app.tm.abort(ttid)
def askStoreTransaction(self, conn, ttid, *txn_info):
self.app.tm.register(conn, ttid)
self.app.tm.vote(ttid, txn_info)
......@@ -71,41 +69,29 @@ class ClientOperationHandler(EventHandler):
conn.answer(Packets.AnswerVoteTransaction())
def _askStoreObject(self, conn, oid, serial, compression, checksum, data,
data_serial, ttid, unlock, request_time):
data_serial, ttid, request_time):
try:
self.app.tm.storeObject(ttid, serial, oid, compression,
checksum, data, data_serial, unlock)
checksum, data, data_serial)
except ConflictError, err:
# resolvable or not
conn.answer(Packets.AnswerStoreObject(1, oid, err.tid))
except DelayedError:
# locked by a previous transaction, retry later
# If we are unlocking, we want queueEvent to raise
# AlreadyPendingError, to avoid making client wait for an unneeded
# response.
try:
self.app.queueEvent(self._askStoreObject, conn, (oid, serial,
compression, checksum, data, data_serial, ttid,
unlock, request_time), key=(oid, ttid),
raise_on_duplicate=unlock)
except AlreadyPendingError:
conn.answer(Errors.AlreadyPending(dump(oid)))
conn.answer(Packets.AnswerStoreObject(err.tid))
except NotRegisteredError:
# transaction was aborted, cancel this event
logging.info('Forget store of %s:%s by %s delayed by %s',
dump(oid), dump(serial), dump(ttid),
dump(self.app.tm.getLockingTID(oid)))
# send an answer as the client side is waiting for it
conn.answer(Packets.AnswerStoreObject(0, oid, serial))
conn.answer(Packets.AnswerStoreObject(None))
else:
if SLOW_STORE is not None:
if request_time and SLOW_STORE is not None:
duration = time.time() - request_time
if duration > SLOW_STORE:
logging.info('StoreObject delay: %.02fs', duration)
conn.answer(Packets.AnswerStoreObject(0, oid, serial))
conn.answer(Packets.AnswerStoreObject(None))
def askStoreObject(self, conn, oid, serial,
compression, checksum, data, data_serial, ttid, unlock):
compression, checksum, data, data_serial, ttid):
if 1 < compression:
raise ProtocolError('invalid compression value')
# register the transaction
......@@ -116,8 +102,33 @@ class ClientOperationHandler(EventHandler):
assert data_serial is None
else:
checksum = data = None
self._askStoreObject(conn, oid, serial, compression, checksum, data,
data_serial, ttid, unlock, time.time())
try:
self._askStoreObject(conn, oid, serial, compression,
checksum, data, data_serial, ttid, None)
except DelayEvent:
# locked by a previous transaction, retry later
self.app.tm.queueEvent(self._askStoreObject, conn, (oid, serial,
compression, checksum, data, data_serial, ttid, time.time()))
def askRebaseTransaction(self, conn, *args):
conn.answer(Packets.AnswerRebaseTransaction(
self.app.tm.rebase(conn, *args)))
def askRebaseObject(self, conn, ttid, oid):
try:
self._askRebaseObject(conn, ttid, oid, None)
except DelayEvent:
# locked by a previous transaction, retry later
self.app.tm.queueEvent(self._askRebaseObject,
conn, (ttid, oid, time.time()))
def _askRebaseObject(self, conn, ttid, oid, request_time):
conflict = self.app.tm.rebaseObject(ttid, oid)
if request_time and SLOW_STORE is not None:
duration = time.time() - request_time
if duration > SLOW_STORE:
logging.info('RebaseObject delay: %.02fs', duration)
conn.answer(Packets.AnswerRebaseObject(conflict))
def askTIDsFrom(self, conn, min_tid, max_tid, length, partition):
conn.answer(Packets.AnswerTIDsFrom(self.app.dm.getReplicationTIDList(
......@@ -159,25 +170,12 @@ class ClientOperationHandler(EventHandler):
p = Packets.AnswerObjectUndoSerial(object_tid_dict)
conn.answer(p)
def askHasLock(self, conn, ttid, oid):
locking_tid = self.app.tm.getLockingTID(oid)
logging.info('%r check lock of %r:%r', conn, dump(ttid), dump(oid))
if locking_tid is None:
state = LockState.NOT_LOCKED
elif locking_tid is ttid:
state = LockState.GRANTED
else:
state = LockState.GRANTED_TO_OTHER
conn.answer(Packets.AnswerHasLock(oid, state))
def askObjectHistory(self, conn, oid, first, last):
if first >= last:
raise ProtocolError('invalid offsets')
app = self.app
if app.tm.loadLocked(oid):
# Delay the response.
app.queueEvent(self.askObjectHistory, conn, (oid, first, last))
return
raise DelayEvent
history_list = app.dm.getObjectHistory(oid, first, last - first)
if history_list is None:
p = Errors.OidNotFound(dump(oid))
......@@ -185,36 +183,34 @@ class ClientOperationHandler(EventHandler):
p = Packets.AnswerObjectHistory(oid, history_list)
conn.answer(p)
def askCheckCurrentSerial(self, conn, ttid, serial, oid):
def askCheckCurrentSerial(self, conn, ttid, oid, serial):
self.app.tm.register(conn, ttid)
self._askCheckCurrentSerial(conn, ttid, serial, oid, time.time())
try:
self._askCheckCurrentSerial(conn, ttid, oid, serial, None)
except DelayEvent:
# locked by a previous transaction, retry later
self.app.tm.queueEvent(self._askCheckCurrentSerial,
conn, (ttid, oid, serial, time.time()))
def _askCheckCurrentSerial(self, conn, ttid, serial, oid, request_time):
def _askCheckCurrentSerial(self, conn, ttid, oid, serial, request_time):
try:
self.app.tm.checkCurrentSerial(ttid, serial, oid)
self.app.tm.checkCurrentSerial(ttid, oid, serial)
except ConflictError, err:
# resolvable or not
conn.answer(Packets.AnswerCheckCurrentSerial(1, oid, err.tid))
except DelayedError:
# locked by a previous transaction, retry later
try:
self.app.queueEvent(self._askCheckCurrentSerial, conn, (ttid,
serial, oid, request_time), key=(oid, ttid))
except AlreadyPendingError:
conn.answer(Errors.AlreadyPending(dump(oid)))
conn.answer(Packets.AnswerCheckCurrentSerial(err.tid))
except NotRegisteredError:
# transaction was aborted, cancel this event
logging.info('Forget serial check of %s:%s by %s delayed by %s',
dump(oid), dump(serial), dump(ttid),
dump(self.app.tm.getLockingTID(oid)))
# send an answer as the client side is waiting for it
conn.answer(Packets.AnswerCheckCurrentSerial(0, oid, serial))
conn.answer(Packets.AnswerCheckCurrentSerial(None))
else:
if SLOW_STORE is not None:
if request_time and SLOW_STORE is not None:
duration = time.time() - request_time
if duration > SLOW_STORE:
logging.info('CheckCurrentSerial delay: %.02fs', duration)
conn.answer(Packets.AnswerCheckCurrentSerial(0, oid, serial))
conn.answer(Packets.AnswerCheckCurrentSerial(None))
# like ClientOperationHandler but read-only & only for tid <= backup_tid
......@@ -224,11 +220,12 @@ class ClientReadOnlyOperationHandler(ClientOperationHandler):
conn.answer(Errors.ReadOnlyAccess(
'read-only access because cluster is in backuping mode'))
abortTransaction = _readOnly
askStoreTransaction = _readOnly
askVoteTransaction = _readOnly
askStoreObject = _readOnly
askFinalTID = _readOnly
askRebaseObject = _readOnly
askRebaseTransaction = _readOnly
# takes write lock & is only used when going to commit
askCheckCurrentSerial = _readOnly
......
......@@ -27,6 +27,10 @@ class IdentificationHandler(EventHandler):
def connectionLost(self, conn, new_state):
logging.warning('A connection was lost during identification')
def getEventQueue(self):
# for requestIdentification
return self.app.nm
def requestIdentification(self, conn, node_type, uuid, address, name,
id_timestamp):
self.checkClusterName(name)
......@@ -43,12 +47,6 @@ class IdentificationHandler(EventHandler):
if uuid == app.uuid:
raise ProtocolError("uuid conflict or loopback connection")
node = app.nm.getByUUID(uuid, id_timestamp)
if node is None:
# Do never create node automatically, or we could get id
# conflicts. We must only rely on the notifications from the
# master to recognize nodes. So this is not always an error:
# maybe there are incoming notifications.
raise NotReadyError('unknown node: retry later')
if node.isBroken():
raise BrokenNodeDisallowedError
# choose the handler according to the node type
......
......@@ -31,8 +31,8 @@ class MasterOperationHandler(BaseMasterHandler):
dm._setBackupTID(dm.getLastIDs()[0] or ZERO_TID)
dm.commit()
def notifyTransactionFinished(self, conn, *args, **kw):
self.app.replicator.transactionFinished(*args, **kw)
def notifyTransactionFinished(self, conn, *args):
self.app.replicator.transactionFinished(*args)
def notifyPartitionChanges(self, conn, ptid, cell_list):
"""This is very similar to Send Partition Table, except that
......@@ -57,6 +57,10 @@ class MasterOperationHandler(BaseMasterHandler):
def notifyUnlockInformation(self, conn, ttid):
self.app.tm.unlock(ttid)
def abortTransaction(self, conn, ttid, _):
self.app.tm.abort(ttid)
self.app.replicator.transactionFinished(ttid)
def askPack(self, conn, tid):
app = self.app
logging.info('Pack started, up to %s...', dump(tid))
......
......@@ -17,7 +17,7 @@
import weakref
from functools import wraps
from neo.lib.connection import ConnectionClosed
from neo.lib.handler import EventHandler
from neo.lib.handler import DelayEvent, EventHandler
from neo.lib.protocol import Errors, NodeStates, Packets, ProtocolError, \
ZERO_HASH
......@@ -143,12 +143,14 @@ class StorageOperationHandler(EventHandler):
# Server (all methods must set connection as server so that it isn't closed
# if client tasks are finished)
def getEventQueue(self):
return self.app.tm
@checkFeedingConnection(check=True)
def askCheckTIDRange(self, conn, *args):
app = self.app
if app.tm.isLockedTid(args[3]): # max_tid
app.queueEvent(self.askCheckTIDRange, conn, args)
return
raise DelayEvent
msg_id = conn.getPeerId()
conn = weakref.proxy(conn)
def check():
......@@ -187,9 +189,7 @@ class StorageOperationHandler(EventHandler):
# NotifyTransactionFinished(M->S) + AskFetchTransactions(S->S)
# is faster than
# NotifyUnlockInformation(M->S)
app.queueEvent(self.askFetchTransactions, conn,
(partition, length, min_tid, max_tid, tid_list))
return
raise DelayEvent
msg_id = conn.getPeerId()
conn = weakref.proxy(conn)
peer_tid_set = set(tid_list)
......
......@@ -29,7 +29,7 @@ partitions.
2 parts, done sequentially:
- Transaction (metadata) replication
- Object (data) replication
- Object (metadata+data) replication
Both parts follow the same mechanism:
- The range of data to replicate is split into chunks of FETCH_COUNT items
......@@ -37,15 +37,52 @@ Both parts follow the same mechanism:
- For every chunk, the requesting node sends to seeding node the list of items
it already has.
- Before answering, the seeding node sends 1 packet for every missing item.
For items that are already on the replicating node, there is no check that
values matches.
- The seeding node finally answers with the list of items to delete (usually
empty).
Replication is partial, starting from the greatest stored tid in the partition:
- For transactions, this tid is excluded from replication.
- For objects, this tid is included unless the storage already knows it has
all oids for it.
There is no check that item values on both nodes matches.
Internal replication, which is similar to RAID1 (and as opposed to asynchronous
replication to a backup cluster) requires extra care with respect to
transactions. The transition of a cell from OUT_OF_DATE to UP_TO_DATE is done
is several steps.
A replicating node can not depend on other nodes to fetch the data
recently/being committed because that can not be done atomically: it could miss
writes between the processing of its request by a source node and the reception
of the answer.
Therefore, outdated cells are writable: a storage node asks the master for
transactions being committed and then it is expected to fully receive from the
client any transaction that is started after this answer.
Which has in turn other consequences:
- The client must not fail to write to a storage node after the above request
to the master: for this, the storage must have announced it is ready, and it
must delay identification of unknown clients (those for which it hasn't
received yet a notification from the master).
- Writes must be accepted blindly (i.e. without taking a write-lock) when a
storage node lacks the data to check for conflicts. This is possible because
1 up-to-date cell (for each partition) is enough to do these checks.
- Because the client can not reliably know if a storage node is expected to
receive a transaction in full, all writes must succeed.
- Even if the replication is finished, we have to wait that we don't have any
lockless writes left before announcing to the master that we're up-to-date.
To sum up:
1. ask unfinished transactions -> (last_transaction, ttid_list)
2. replicate to last_transaction
3. wait for all ttid_list to be finished -> new last_transaction
4. replicate to last_transaction
5. no lockless write anymore, except to (oid, ttid) that were already
stored/checked without taking a lock
6. wait for all transactions with lockless writes to be finished
7. announce we're up-to-date
For any failed write, the client marks the storage node as failed and stops
writing to it for the transaction. Unless there's no failed write, vote ends
with an extra request to the master: the transaction will only succeed if the
failed nodes can be disconnected, forcing them to replicate the missing data.
TODO: Packing and replication currently fail when they happen at the same time.
"""
......@@ -85,11 +122,6 @@ class Replicator(object):
if node is not None and node.isConnected(True):
return node.getConnection()
# XXX: We can't replicate unfinished transactions but do we need such
# complex code ? Backup mechanism does not rely on this: instead
# the upstream storage delays the answer. Maybe we can do the same
# for internal replication.
def setUnfinishedTIDList(self, max_tid, ttid_list, offset_list):
"""This is a callback from MasterOperationHandler."""
assert self.ttid_set.issubset(ttid_list), (self.ttid_set, ttid_list)
......@@ -103,14 +135,19 @@ class Replicator(object):
self.replicate_dict[offset] = max_tid
self._nextPartition()
def transactionFinished(self, ttid, max_tid):
def transactionFinished(self, ttid, max_tid=None):
""" Callback from MasterOperationHandler """
self.ttid_set.remove(ttid)
try:
self.ttid_set.remove(ttid)
except KeyError:
assert max_tid is None, max_tid
return
min_ttid = min(self.ttid_set) if self.ttid_set else INVALID_TID
for offset, p in self.partition_dict.iteritems():
if p.max_ttid and p.max_ttid < min_ttid:
p.max_ttid = None
self.replicate_dict[offset] = max_tid
if max_tid:
self.replicate_dict[offset] = max_tid
self._nextPartition()
def getBackupTID(self):
......@@ -136,7 +173,7 @@ class Replicator(object):
app = self.app
pt = app.pt
uuid = app.uuid
self.partition_dict = p = {}
self.partition_dict = {}
self.replicate_dict = {}
self.source_dict = {}
self.ttid_set = set()
......@@ -160,8 +197,7 @@ class Replicator(object):
p.next_trans = p.next_obj = next_tid
p.max_ttid = None
if outdated_list:
self.app.master_conn.ask(Packets.AskUnfinishedTransactions(),
offset_list=outdated_list)
self.app.tm.replicating(outdated_list)
def notifyPartitionChanges(self, cell_list):
"""This is a callback from MasterOperationHandler."""
......@@ -190,8 +226,7 @@ class Replicator(object):
p.max_ttid = INVALID_TID
added_list.append(offset)
if added_list:
self.app.master_conn.ask(Packets.AskUnfinishedTransactions(),
offset_list=added_list)
self.app.tm.replicating(added_list)
if abort:
self.abort()
......@@ -325,9 +360,10 @@ class Replicator(object):
p = self.partition_dict[offset]
p.next_obj = add64(tid, 1)
self.updateBackupTID()
if not p.max_ttid:
p = Packets.NotifyReplicationDone(offset, tid)
self.app.master_conn.notify(p)
if p.max_ttid:
logging.debug("unfinished transactions: %r", self.ttid_set)
else:
self.app.tm.replicated(offset, tid)
logging.debug("partition %u replicated up to %s from %r",
offset, dump(tid), self.current_node)
self.getCurrentConnection().setReconnectionNoDelay()
......
This diff is collapsed.
......@@ -37,6 +37,7 @@ from time import time
from struct import pack, unpack
from unittest.case import _ExpectedFailure, _UnexpectedSuccess
try:
from transaction.interfaces import IDataManager
from ZODB.utils import newTid
except ImportError:
pass
......@@ -378,6 +379,30 @@ class NeoUnitTestBase(NeoTestBase):
return packet
class TransactionalResource(object):
class _sortKey(object):
def __init__(self, last):
self._last = last
def __cmp__(self, other):
assert type(self) is not type(other), other
return 1 if self._last else -1
def __init__(self, txn, last, **kw):
self.sortKey = lambda: self._sortKey(last)
for k in kw:
assert callable(IDataManager.get(k)), k
self.__dict__.update(kw)
txn.get().join(self)
def __getattr__(self, attr):
if callable(IDataManager.get(attr)):
return lambda *_: None
return self.__getattribute__(attr)
class Patch(object):
"""
Patch attributes and revert later automatically.
......
......@@ -43,9 +43,6 @@ def _ask(self, conn, packet, handler=None, **kw):
handler.dispatch(conn, conn.fakeReceived())
return self.getHandlerData()
def failing_tryToResolveConflict(oid, conflict_serial, serial, data):
raise ConflictError
class ClientApplicationTests(NeoUnitTestBase):
def setUp(self):
......@@ -73,7 +70,7 @@ class ClientApplicationTests(NeoUnitTestBase):
def _begin(self, app, txn, tid):
txn_context = app._txn_container.new(txn)
txn_context['ttid'] = tid
txn_context.ttid = tid
return txn_context
def getApp(self, master_nodes=None, name='test', **kw):
......@@ -115,7 +112,7 @@ class ClientApplicationTests(NeoUnitTestBase):
# connection to SN close
self.assertFalse(oid in cache._oid_dict)
conn = Mock({'getAddress': ('', 0)})
app.cp = Mock({'iterateForObject': [(Mock(), conn)]})
app.cp = Mock({'iterateForObject': (conn,)})
def fakeReceived(packet):
packet.setId(0)
conn.fakeReceived = iter((packet,)).next
......@@ -182,11 +179,8 @@ class ClientApplicationTests(NeoUnitTestBase):
tid = self.makeTID()
txn = self.makeTransactionObject()
app.master_conn = Mock()
conn = Mock()
self.assertRaises(StorageTransactionError, app.undo, tid,
txn, failing_tryToResolveConflict)
self.assertRaises(StorageTransactionError, app.undo, tid, txn)
# no packet sent
self.checkNoPacketSent(conn)
self.checkNoPacketSent(app.master_conn)
def test_connectToPrimaryNode(self):
......
......@@ -23,7 +23,6 @@ import socket
from struct import pack
from neo.lib.util import makeChecksum, u64
from ZODB.FileStorage import FileStorage
from ZODB.POSException import ConflictError
from ZODB.tests.StorageTestBase import zodb_pickle
from persistent import Persistent
from . import NEOCluster, NEOFunctionalTest
......@@ -41,25 +40,6 @@ class Tree(Persistent):
self.right = Tree(depth)
self.left = Tree(depth)
# simple persistent object with conflict resolution
class PCounter(Persistent):
_value = 0
def value(self):
return self._value
def inc(self):
self._value += 1
class PCounterWithResolution(PCounter):
def _p_resolveConflict(self, old, saved, new):
new['_value'] = saved['_value'] + new['_value']
return new
class PObject(Persistent):
pass
......@@ -93,29 +73,6 @@ class ClientTests(NEOFunctionalTest):
conn = self.db.open(transaction_manager=txn)
return (txn, conn)
def testConflictResolutionTriggered1(self):
""" Check that ConflictError is raised on write conflict """
# create the initial objects
self.__setup()
t, c = self.makeTransaction()
c.root()['without_resolution'] = PCounter()
t.commit()
# first with no conflict resolution
t1, c1 = self.makeTransaction()
t2, c2 = self.makeTransaction()
o1 = c1.root()['without_resolution']
o2 = c2.root()['without_resolution']
self.assertEqual(o1.value(), 0)
self.assertEqual(o2.value(), 0)
o1.inc()
o2.inc()
o2.inc()
t1.commit()
self.assertEqual(o1.value(), 1)
self.assertEqual(o2.value(), 2)
self.assertRaises(ConflictError, t2.commit)
def testIsolationAtZopeLevel(self):
""" Check transaction isolation within zope connection """
self.__setup()
......@@ -254,33 +211,6 @@ class ClientTests(NEOFunctionalTest):
self.__checkTree(neo_conn.root()['trees'])
self.assertEqual(dump, self.__dump(neo_db.storage))
def testLockTimeout(self):
""" Hold a lock on an object to block a second transaction """
def test():
self.neo = NEOCluster(['test_neo1'], replicas=0,
temp_dir=self.getTempDirectory())
self.neo.start()
# BUG: The following 2 lines creates 2 app, i.e. 2 TCP connections
# to the storage, so there may be a race condition at network
# level and 'st2.store' may be effective before 'st1.store'.
db1, conn1 = self.neo.getZODBConnection()
db2, conn2 = self.neo.getZODBConnection()
st1, st2 = conn1._storage, conn2._storage
t1, t2 = transaction.Transaction(), transaction.Transaction()
t1.user = t2.user = u'user'
t1.description = t2.description = u'desc'
oid = st1.new_oid()
rev = '\0' * 8
data = zodb_pickle(PObject())
st2.tpc_begin(t2)
st1.tpc_begin(t1)
st1.store(oid, rev, data, '', t1)
# this store will be delayed
st2.store(oid, rev, data, '', t2)
# the vote will timeout as t1 never release the lock
self.assertRaises(ConflictError, st2.tpc_vote, t2)
self.runWithTimeout(40, test)
def testIPv6Client(self):
""" Test the connectivity of an IPv6 connection for neo client """
......@@ -297,51 +227,6 @@ class ClientTests(NEOFunctionalTest):
db2, conn2 = self.neo.getZODBConnection()
self.runWithTimeout(40, test)
def testDelayedLocksCancelled(self):
"""
Hold a lock on an object, try to get another lock on the same
object to delay it. Then cancel the second transaction and check
that the lock is not hold when the first transaction ends
"""
def test():
self.neo = NEOCluster(['test_neo1'], replicas=0,
temp_dir=self.getTempDirectory())
self.neo.start()
db1, conn1 = self.neo.getZODBConnection()
db2, conn2 = self.neo.getZODBConnection()
st1, st2 = conn1._storage, conn2._storage
t1, t2 = transaction.Transaction(), transaction.Transaction()
t1.user = t2.user = u'user'
t1.description = t2.description = u'desc'
oid = st1.new_oid()
rev = '\0' * 8
data = zodb_pickle(PObject())
st1.tpc_begin(t1)
st2.tpc_begin(t2)
# t1 own the lock
st1.store(oid, rev, data, '', t1)
# t2 store is delayed
st2.store(oid, rev, data, '', t2)
# cancel t2, should cancel the store too
st2.tpc_abort(t2)
# finish t1, should release the lock
st1.tpc_vote(t1)
st1.tpc_finish(t1)
db3, conn3 = self.neo.getZODBConnection()
st3 = conn3._storage
t3 = transaction.Transaction()
t3.user = u'user'
t3.description = u'desc'
st3.tpc_begin(t3)
# retrieve the last revision
data, serial = st3.load(oid)
# try to store again, should not be delayed
st3.store(oid, serial, data, '', t3)
# the vote should not timeout
st3.tpc_vote(t3)
st3.tpc_finish(t3)
self.runWithTimeout(10, test)
def testGreaterOIDSaved(self):
"""
Store an object with an OID greater than the last generated by the
......
......@@ -19,8 +19,8 @@ from ..mock import Mock
from .. import NeoUnitTestBase
from neo.lib.util import p64
from neo.lib.protocol import NodeTypes, NodeStates, Packets
from neo.master.handlers.client import ClientServiceHandler
from neo.master.app import Application
from neo.master.handlers.client import ClientServiceHandler
class MasterClientHandlerTests(NeoUnitTestBase):
......@@ -39,8 +39,6 @@ class MasterClientHandlerTests(NeoUnitTestBase):
# define some variable to simulate client and storage node
self.client_port = 11022
self.storage_port = 10021
self.master_port = 10010
self.master_address = ('127.0.0.1', self.master_port)
self.client_address = ('127.0.0.1', self.client_port)
self.storage_address = ('127.0.0.1', self.storage_port)
self.storage_uuid = self.getStorageUUID()
......@@ -63,105 +61,6 @@ class MasterClientHandlerTests(NeoUnitTestBase):
)
return uuid
def checkAnswerBeginTransaction(self, conn):
return self.checkAnswerPacket(conn, Packets.AnswerBeginTransaction)
# Tests
def test_07_askBeginTransaction(self):
tid1 = self.getNextTID()
tid2 = self.getNextTID()
service = self.service
tm_org = self.app.tm
self.app.tm = tm = Mock({
'begin': '\x00\x00\x00\x00\x00\x00\x00\x01',
})
# client call it
client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port)
client_node = self.app.nm.getByUUID(client_uuid)
conn = self.getFakeConnection(client_uuid, self.client_address)
service.askBeginTransaction(conn, None)
calls = tm.mockGetNamedCalls('begin')
self.assertEqual(len(calls), 1)
calls[0].checkArgs(client_node, None)
self.checkAnswerBeginTransaction(conn)
# Client asks for a TID
conn = self.getFakeConnection(client_uuid, self.client_address)
self.app.tm = tm_org
service.askBeginTransaction(conn, tid1)
calls = tm.mockGetNamedCalls('begin')
self.assertEqual(len(calls), 1)
calls[0].checkArgs(client_node, None)
packet = self.checkAnswerBeginTransaction(conn)
self.assertEqual(packet.decode(), (tid1, ))
def test_08_askNewOIDs(self):
service = self.service
oid1, oid2 = p64(1), p64(2)
self.app.tm.setLastOID(oid1)
# client call it
client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port)
conn = self.getFakeConnection(client_uuid, self.client_address)
for node in self.app.nm.getStorageList():
conn = self.getFakeConnection(node.getUUID(), node.getAddress())
node.setConnection(conn)
service.askNewOIDs(conn, 1)
self.assertTrue(self.app.tm.getLastOID() > oid1)
def test_09_askFinishTransaction(self):
service = self.service
# do the right job
client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port)
storage_uuid = self.storage_uuid
storage_conn = self.getFakeConnection(storage_uuid,
self.storage_address, is_server=True)
storage2_uuid = self.identifyToMasterNode(port=10022)
storage2_conn = self.getFakeConnection(storage2_uuid,
(self.storage_address[0], self.storage_address[1] + 1),
is_server=True)
self.app.setStorageReady(storage2_uuid)
conn = self.getFakeConnection(client_uuid, self.client_address)
self.app.pt = Mock({
'getPartition': 0,
'getCellList': [
Mock({'getUUID': storage_uuid}),
Mock({'getUUID': storage2_uuid}),
],
'getPartitions': 2,
})
ttid = self.getNextTID()
service.askBeginTransaction(conn, ttid)
conn = self.getFakeConnection(client_uuid, self.client_address)
self.app.nm.getByUUID(storage_uuid).setConnection(storage_conn)
# No packet sent if storage node is not ready
self.assertFalse(self.app.isStorageReady(storage_uuid))
service.askFinishTransaction(conn, ttid, (), ())
self.checkNoPacketSent(storage_conn)
# ...but AskLockInformation is sent if it is ready
self.app.setStorageReady(storage_uuid)
self.assertTrue(self.app.isStorageReady(storage_uuid))
service.askFinishTransaction(conn, ttid, (), ())
self.checkAskPacket(storage_conn, Packets.AskLockInformation)
self.assertEqual(len(self.app.tm.registerForNotification(storage_uuid)), 1)
txn = self.app.tm[ttid]
pending_ttid = list(self.app.tm.registerForNotification(storage_uuid))[0]
self.assertEqual(ttid, pending_ttid)
self.assertEqual(len(txn.getOIDList()), 0)
self.assertEqual(len(txn.getUUIDList()), 1)
def test_connectionClosed(self):
# give a client uuid which have unfinished transactions
client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT,
port = self.client_port)
conn = self.getFakeConnection(client_uuid, self.client_address)
self.app.listening_conn = object() # mark as running
lptid = self.app.pt.getID()
self.assertEqual(self.app.nm.getByUUID(client_uuid).getState(),
NodeStates.RUNNING)
self.service.connectionClosed(conn)
# node must be have been remove, and no more transaction must remains
self.assertEqual(self.app.nm.getByUUID(client_uuid), None)
self.assertEqual(lptid, self.app.pt.getID())
def test_askPack(self):
self.assertEqual(self.app.packing, None)
self.app.nm.createClient()
......
......@@ -19,9 +19,9 @@ from ..mock import Mock
from neo.lib import protocol
from .. import NeoUnitTestBase
from neo.lib.protocol import NodeTypes, NodeStates, Packets
from neo.master.app import Application
from neo.master.handlers.election import ClientElectionHandler, \
ServerElectionHandler
from neo.master.app import Application
from neo.lib.exception import ElectionFailure
from neo.lib.connection import ClientConnection
......
......@@ -24,66 +24,11 @@ from neo.master.transactions import TransactionManager
class testTransactionManager(NeoUnitTestBase):
def makeOID(self, i):
return pack('!Q', i)
def makeNode(self, node_type):
uuid = self.getNewUUID(node_type)
node = Mock({'getUUID': uuid, '__hash__': uuid, '__repr__': 'FakeNode'})
return uuid, node
def test_storageLost(self):
client1 = Mock({'__hash__': 1})
client2 = Mock({'__hash__': 2})
client3 = Mock({'__hash__': 3})
storage_1_uuid = self.getStorageUUID()
storage_2_uuid = self.getStorageUUID()
oid_list = [self.makeOID(1), ]
tm = TransactionManager(None)
# Transaction 1: 2 storage nodes involved, one will die and the other
# already answered node lock
msg_id_1 = 1
ttid1 = tm.begin(client1)
tid1 = tm.prepare(ttid1, 1, oid_list,
[storage_1_uuid, storage_2_uuid], msg_id_1)
tm.lock(ttid1, storage_2_uuid)
t1 = tm[ttid1]
self.assertFalse(t1.locked())
# Storage 1 dies:
# t1 is over
self.assertTrue(t1.storageLost(storage_1_uuid))
self.assertEqual(t1.getUUIDList(), [storage_2_uuid])
del tm[ttid1]
# Transaction 2: 2 storage nodes involved, one will die
msg_id_2 = 2
ttid2 = tm.begin(client2)
tid2 = tm.prepare(ttid2, 1, oid_list,
[storage_1_uuid, storage_2_uuid], msg_id_2)
t2 = tm[ttid2]
self.assertFalse(t2.locked())
# Storage 1 dies:
# t2 still waits for storage 2
self.assertFalse(t2.storageLost(storage_1_uuid))
self.assertEqual(t2.getUUIDList(), [storage_2_uuid])
self.assertTrue(t2.lock(storage_2_uuid))
del tm[ttid2]
# Transaction 3: 1 storage node involved, which won't die
msg_id_3 = 3
ttid3 = tm.begin(client3)
tid3 = tm.prepare(ttid3, 1, oid_list, [storage_2_uuid, ],
msg_id_3)
t3 = tm[ttid3]
self.assertFalse(t3.locked())
# Storage 1 dies:
# t3 doesn't care
self.assertFalse(t3.storageLost(storage_1_uuid))
self.assertEqual(t3.getUUIDList(), [storage_2_uuid])
self.assertTrue(t3.lock(storage_2_uuid))
del tm[ttid3]
def testTIDUtils(self):
"""
Tests packTID/unpackTID/addTID.
......@@ -110,53 +55,14 @@ class testTransactionManager(NeoUnitTestBase):
unpackTID(addTID(packTID((2010, 11, 30, 23, 59), 2**32 - 1), 1)),
((2010, 12, 1, 0, 0), 0))
def testTransactionLock(self):
"""
Transaction lock is present to ensure invalidation TIDs are sent in
strictly increasing order.
Note: this implementation might change later, for more parallelism.
"""
client_uuid, client = self.makeNode(NodeTypes.CLIENT)
tm = TransactionManager(None)
# With a requested TID, lock spans from begin to remove
ttid1 = self.getNextTID()
ttid2 = self.getNextTID()
tid1 = tm.begin(client, ttid1)
self.assertEqual(tid1, ttid1)
del tm[ttid1]
# Without a requested TID, lock spans from prepare to remove only
ttid3 = tm.begin(client)
ttid4 = tm.begin(client) # Doesn't raise
node = Mock({'getUUID': client_uuid, '__hash__': 0})
tid4 = tm.prepare(ttid4, 1, [], [], 0)
del tm[ttid4]
tm.prepare(ttid3, 1, [], [], 0)
def testClientDisconectsAfterBegin(self):
client_uuid1, node1 = self.makeNode(NodeTypes.CLIENT)
tm = TransactionManager(None)
tid1 = self.getNextTID()
tid2 = self.getNextTID()
tm.begin(node1, tid1)
tm.begin(node1, 0, tid1)
tm.clientLost(node1)
self.assertTrue(tid1 not in tm)
def testUnlockPending(self):
callback = Mock()
uuid1, node1 = self.makeNode(NodeTypes.CLIENT)
uuid2, node2 = self.makeNode(NodeTypes.CLIENT)
storage_uuid = self.getStorageUUID()
tm = TransactionManager(callback)
ttid1 = tm.begin(node1)
ttid2 = tm.begin(node2)
tid1 = tm.prepare(ttid1, 1, [], [storage_uuid], 0)
tid2 = tm.prepare(ttid2, 1, [], [storage_uuid], 0)
tm.lock(ttid2, storage_uuid)
# txn 2 is still blocked by txn 1
self.assertEqual(len(callback.getNamedCalls('__call__')), 0)
tm.lock(ttid1, storage_uuid)
# both transactions are unlocked when txn 1 is fully locked
self.assertEqual(len(callback.getNamedCalls('__call__')), 2)
if __name__ == '__main__':
unittest.main()
......@@ -20,7 +20,7 @@ from .. import NeoUnitTestBase
from neo.storage.app import Application
from neo.storage.handlers.client import ClientOperationHandler
from neo.lib.util import p64
from neo.lib.protocol import INVALID_TID, Packets, LockState
from neo.lib.protocol import INVALID_TID, Packets
class StorageClientHandlerTests(NeoUnitTestBase):
......@@ -100,24 +100,5 @@ class StorageClientHandlerTests(NeoUnitTestBase):
self.operation.askObjectUndoSerial(conn, tid, ltid, undone_tid, oid_list)
self.checkErrorPacket(conn)
def test_askHasLock(self):
tid_1 = self.getNextTID()
tid_2 = self.getNextTID()
oid = self.getNextTID()
def getLockingTID(oid):
return locking_tid
self.app.tm.getLockingTID = getLockingTID
for locking_tid, status in (
(None, LockState.NOT_LOCKED),
(tid_1, LockState.GRANTED),
(tid_2, LockState.GRANTED_TO_OTHER),
):
conn = self._getConnection()
self.operation.askHasLock(conn, tid_1, oid)
p_oid, p_status = self.checkAnswerPacket(conn,
Packets.AnswerHasLock).decode()
self.assertEqual(oid, p_oid)
self.assertEqual(status, p_status)
if __name__ == "__main__":
unittest.main()
......@@ -16,7 +16,6 @@
import unittest
from ..mock import Mock
from collections import deque
from .. import NeoUnitTestBase
from neo.storage.app import Application
from neo.storage.handlers.master import MasterOperationHandler
......@@ -31,10 +30,6 @@ class StorageMasterHandlerTests(NeoUnitTestBase):
# create an application object
config = self.getStorageConfiguration(master_number=1)
self.app = Application(config)
self.app.transaction_dict = {}
self.app.store_lock_dict = {}
self.app.load_lock_dict = {}
self.app.event_queue = deque()
# handler
self.operation = MasterOperationHandler(self.app)
# set pmn
......
......@@ -19,9 +19,7 @@ from ..mock import Mock
from .. import NeoUnitTestBase
from neo.storage.app import Application
from neo.lib.protocol import CellStates
from collections import deque
from neo.lib.pt import PartitionTable
from neo.storage.exception import AlreadyPendingError
class StorageAppTests(NeoUnitTestBase):
......@@ -31,8 +29,6 @@ class StorageAppTests(NeoUnitTestBase):
# create an application object
config = self.getStorageConfiguration(master_number=1)
self.app = Application(config)
self.app.event_queue = deque()
self.app.event_queue_dict = {}
def _tearDown(self, success):
self.app.close()
......@@ -121,26 +117,6 @@ class StorageAppTests(NeoUnitTestBase):
self.assertTrue(cell_list[0].getUUID() in (master_uuid, storage_uuid))
self.assertTrue(cell_list[1].getUUID() in (master_uuid, storage_uuid))
def test_02_queueEvent(self):
self.assertEqual(len(self.app.event_queue), 0)
msg_id = 1325136
event = Mock({'__repr__': 'event'})
conn = Mock({'__repr__': 'conn', 'getPeerId': msg_id})
key = 'foo'
self.app.queueEvent(event, conn, ("test", ), key=key)
self.assertEqual(len(self.app.event_queue), 1)
_key, _event, _msg_id, _conn, args = self.app.event_queue[0]
self.assertEqual(key, _key)
self.assertEqual(msg_id, _msg_id)
self.assertEqual(len(args), 1)
self.assertEqual(args[0], "test")
self.assertRaises(AlreadyPendingError, self.app.queueEvent, event,
conn, ("test2", ), key=key)
self.assertEqual(len(self.app.event_queue), 1)
self.app.queueEvent(event, conn, ("test3", ), key=key,
raise_on_duplicate=False)
self.assertEqual(len(self.app.event_queue), 2)
if __name__ == '__main__':
unittest.main()
......@@ -28,7 +28,7 @@ class TransactionManagerTests(NeoUnitTestBase):
self.app = Mock()
# no history
self.app.dm = Mock({'getObjectHistory': []})
self.app.pt = Mock({'isAssigned': True})
self.app.pt = Mock({'isAssigned': True, 'getPartitions': 2})
self.app.em = Mock({'setTimeout': None})
self.manager = TransactionManager(self.app)
......
#
# Copyright (C) 2009-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from . import NeoTestBase
from neo.lib.dispatcher import Dispatcher, ForgottenPacket
from Queue import Queue
import unittest
class DispatcherTests(NeoTestBase):
def setUp(self):
NeoTestBase.setUp(self)
self.dispatcher = Dispatcher()
def testForget(self):
conn = object()
queue = Queue()
MARKER = object()
# Register an expectation
self.dispatcher.register(conn, 1, queue)
# ...and forget about it, returning registered queue
forgotten_queue = self.dispatcher.forget(conn, 1)
self.assertTrue(queue is forgotten_queue, (queue, forgotten_queue))
# A ForgottenPacket must have been put in the queue
queue_conn, packet, kw = queue.get(block=False)
self.assertTrue(isinstance(packet, ForgottenPacket), packet)
# ...with appropriate packet id
self.assertEqual(packet.getId(), 1)
# ...and appropriate connection
self.assertTrue(conn is queue_conn, (conn, queue_conn))
# If forgotten twice, it must raise a KeyError
self.assertRaises(KeyError, self.dispatcher.forget, conn, 1)
# Event arrives, return value must be True (it was expected)
self.assertTrue(self.dispatcher.dispatch(conn, 1, MARKER, {}))
# ...but must not have reached the queue
self.assertTrue(queue.empty())
# Register an expectation
self.dispatcher.register(conn, 1, queue)
# ...and forget about it
self.dispatcher.forget(conn, 1)
queue.get(block=False)
# No exception must happen if connection is lost.
self.dispatcher.unregister(conn)
# Forgotten message's queue must not have received a "None"
self.assertTrue(queue.empty())
if __name__ == '__main__':
unittest.main()
......@@ -164,7 +164,7 @@ class NodeManagerTests(NeoUnitTestBase):
NodeStates.UNKNOWN, None),
)
# update manager content
manager.update(Mock(), node_list)
manager.update(Mock(), time(), node_list)
# - the client gets down
self.checkClients([])
# - master change it's address
......
......@@ -413,6 +413,9 @@ class ClientApplication(Node, neo.client.app.Application):
def __init__(self, master_nodes, name, **kw):
super(ClientApplication, self).__init__(master_nodes, name, **kw)
self.poll_thread.node_name = name
# Smaller cache to speed up tests that checks behaviour when it's too
# small. See also NEOCluster.cache_size
self._cache._max_size //= 1024
def _run(self):
try:
......@@ -433,6 +436,10 @@ class ClientApplication(Node, neo.client.app.Application):
conn = self.cp.getConnForNode(self.nm.getByUUID(peer.uuid))
yield conn
def extraCellSortKey(self, key):
return Patch(self.cp, getCellSortKey=lambda orig, cell:
(orig(cell), key(cell)))
class NeoCTL(neo.neoctl.app.NeoCTL):
def __init__(self, *args, **kw):
......@@ -541,7 +548,8 @@ class ConnectionFilter(object):
def remove(self, *filters):
with self.lock:
for filter in filters:
del self.filter_dict[filter]
for p in self.filter_dict.pop(filter):
p.revert()
self._retry()
def discard(self, *filters):
......@@ -711,6 +719,10 @@ class NEOCluster(object):
def primary_master(self):
master, = [master for master in self.master_list if master.primary]
return master
@property
def cache_size(self):
return self.client._cache._max_size
###
def __enter__(self):
......@@ -880,10 +892,6 @@ class NEOCluster(object):
txn = transaction.TransactionManager()
return txn, (self.db if db is None else db).open(txn)
def extraCellSortKey(self, key):
return Patch(self.client.cp, getCellSortKey=lambda orig, cell:
(orig(cell), key(cell)))
def moduloTID(self, partition):
"""Force generation of TIDs that will be stored in given partition"""
partition = p64(partition)
......@@ -956,13 +964,12 @@ class NEOThreadedTest(NeoTestBase):
return obj
return unpickler
class newThread(threading.Thread):
class newPausedThread(threading.Thread):
def __init__(self, func, *args, **kw):
threading.Thread.__init__(self)
self.__target = func, args, kw
self.daemon = True
self.start()
def run(self):
try:
......@@ -970,6 +977,8 @@ class NEOThreadedTest(NeoTestBase):
self.__exc_info = None
except:
self.__exc_info = sys.exc_info()
if self.__exc_info[0] is NEOThreadedTest.failureException:
traceback.print_exception(*self.__exc_info)
def join(self, timeout=None):
threading.Thread.join(self, timeout)
......@@ -978,12 +987,64 @@ class NEOThreadedTest(NeoTestBase):
del self.__exc_info
raise etype, value, tb
class newThread(newPausedThread):
def __init__(self, *args, **kw):
NEOThreadedTest.newPausedThread.__init__(self, *args, **kw)
self.start()
def commitWithStorageFailure(self, client, txn):
with Patch(client, _getFinalTID=lambda *_: None):
self.assertRaises(ConnectionClosed, txn.commit)
def assertPartitionTable(self, cluster, stats):
self.assertEqual(stats, '|'.join(cluster.admin.pt.formatRows()))
pt = cluster.admin.pt
index = [x.uuid for x in cluster.storage_list].index
self.assertEqual(stats, '|'.join(pt._formatRows(sorted(
pt.count_dict, key=lambda x: index(x.getUUID())))))
@staticmethod
def noConnection(jar, storage):
return Patch(jar.db().storage.app.cp, getConnForNode=lambda orig, node:
None if node.getUUID() == storage.uuid else orig(node))
@staticmethod
def readCurrent(ob):
ob._p_activate()
ob._p_jar.readCurrent(ob)
class ThreadId(list):
def __call__(self):
try:
return self.index(thread.get_ident())
except ValueError:
i = len(self)
self.append(thread.get_ident())
return i
@apply
class RandomConflictDict(dict):
# One must not depend on how Python iterates over dict keys, because this
# is implementation-defined behaviour. This patch makes sure of that when
# resolving conflicts.
def __new__(cls):
from neo.client.transactions import Transaction
def __init__(orig, self, *args):
orig(self, *args)
assert self.conflict_dict == {}
self.conflict_dict = dict.__new__(cls)
return Patch(Transaction, __init__=__init__)
def popitem(self):
try:
k = random.choice(list(self))
except IndexError:
raise KeyError
return k, self.pop(k)
def predictable_random(seed=None):
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment