Commit 8e3c7b01 authored by Julien Muchembled's avatar Julien Muchembled

Implements backup using specialised storage nodes and relying on replication

Replication is also fully reimplemented:
- It is not done anymore on whole partitions.
- It runs at lowest priority not to degrades performance for client nodes.

Schema of MySQL table is changed to optimize storage layout: rows are now
grouped by age, for good partial replication performance.
This certainly also speeds up simple loads/stores.
parent 75d83690
...@@ -111,42 +111,17 @@ RC - Review output of pylint (CODE) ...@@ -111,42 +111,17 @@ RC - Review output of pylint (CODE)
consider using query(request, args) instead of query(request % args) consider using query(request, args) instead of query(request % args)
- Make listening address and port optionnal, and if they are not provided - Make listening address and port optionnal, and if they are not provided
listen on all interfaces on any available port. listen on all interfaces on any available port.
- Replication throttling (HIGH AVAILABILITY) - Make replication speed configurable (HIGH AVAILABILITY)
In its current implementation, replication runs at full speed, which In its current implementation, replication runs at lowest priority, not to
degrades performance for client nodes. Replication should allow degrades performance for client nodes. But when there's only 1 storage
throttling, and that throttling should be configurable. left for a partition, it may be wanted to guarantee a minimum speed to
See "Replication pipelining". avoid complete data loss if another failure happens too early.
- Pack segmentation & throttling (HIGH AVAILABILITY) - Pack segmentation & throttling (HIGH AVAILABILITY)
In its current implementation, pack runs in one call on all storage nodes In its current implementation, pack runs in one call on all storage nodes
at the same time, which lcoks down the whole cluster. This task should at the same time, which lcoks down the whole cluster. This task should
be split in chunks and processed in "background" on storage nodes. be split in chunks and processed in "background" on storage nodes.
Packing throttling should probably be at the lowest possible priority Packing throttling should probably be at the lowest possible priority
(below interactive use and below replication). (below interactive use and below replication).
- Replication pipelining (SPEED)
Replication work currently with too many exchanges between replicating
storage, and network latency can become a significant limit.
This should be changed to have just one initial request from
replicating storage, and multiple packets from reference storage with
database range checksums. When receiving these checksums, replicating
storage must compare with what it has, and ask row lists (might not even
be required) and data when there are differences. Quick fetching from
network with asynchronous checking (=queueing) + congestion control
(asking reference storage's to pause its packet flow) will probably be
required.
This should make it easier to throttle replication workload on reference
storage node, as it can decide to postpone replication-related packets on
its own.
- Partial replication (SPEED)
In its current implementation, replication always happens on a whole
partition. In typical use, only a few last transactions will have been
missed, so replicating only past a given TID would be much faster.
To achieve this, storage nodes must store 2 values:
- a pack identifier, which must be different each time a pack occurs
(increasing number sequence, TID-ish, etc) to trigger a
whole-partition replication when a pack happened (this could be
improved too, later)
- the latest (-ish) transaction committed locally, to use as a lower
replication boundary
- tpc_finish failures propagation to master (FUNCTIONALITY) - tpc_finish failures propagation to master (FUNCTIONALITY)
When asked to lock transaction data, if something goes wrong the master When asked to lock transaction data, if something goes wrong the master
node must be informed. node must be informed.
......
...@@ -9,7 +9,7 @@ SQL commands to migrate each storage from NEO 0.10.x:: ...@@ -9,7 +9,7 @@ SQL commands to migrate each storage from NEO 0.10.x::
CREATE TABLE new_data (id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, hash BINARY(20) NOT NULL UNIQUE, compression TINYINT UNSIGNED NULL, value LONGBLOB NULL) ENGINE = InnoDB SELECT DISTINCT obj.hash as hash, compression, value FROM obj, data WHERE obj.hash=data.hash ORDER BY serial; CREATE TABLE new_data (id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, hash BINARY(20) NOT NULL UNIQUE, compression TINYINT UNSIGNED NULL, value LONGBLOB NULL) ENGINE = InnoDB SELECT DISTINCT obj.hash as hash, compression, value FROM obj, data WHERE obj.hash=data.hash ORDER BY serial;
DROP TABLE data; DROP TABLE data;
RENAME TABLE new_data TO data; RENAME TABLE new_data TO data;
CREATE TABLE new_obj (partition SMALLINT UNSIGNED NOT NULL, oid BIGINT UNSIGNED NOT NULL, serial BIGINT UNSIGNED NOT NULL, data_id BIGINT UNSIGNED NULL, value_serial BIGINT UNSIGNED NULL, PRIMARY KEY (partition, oid, serial), KEY (data_id)) ENGINE = InnoDB SELECT partition, oid, serial, data.id as data_id, value_serial FROM obj LEFT JOIN data ON (obj.hash=data.hash); CREATE TABLE new_obj (partition SMALLINT UNSIGNED NOT NULL, oid BIGINT UNSIGNED NOT NULL, serial BIGINT UNSIGNED NOT NULL, data_id BIGINT UNSIGNED NULL, value_serial BIGINT UNSIGNED NULL, PRIMARY KEY (partition, serial, oid), KEY (partition, oid, serial), KEY (data_id)) ENGINE = InnoDB SELECT partition, oid, serial, data.id as data_id, value_serial FROM obj LEFT JOIN data ON (obj.hash=data.hash);
DROP TABLE obj; DROP TABLE obj;
RENAME TABLE new_obj TO obj; RENAME TABLE new_obj TO obj;
ALTER TABLE tobj CHANGE hash data_id BIGINT UNSIGNED NULL; ALTER TABLE tobj CHANGE hash data_id BIGINT UNSIGNED NULL;
......
...@@ -959,7 +959,7 @@ class Application(object): ...@@ -959,7 +959,7 @@ class Application(object):
tid_list = [] tid_list = []
# request a tid list for each partition # request a tid list for each partition
for offset in xrange(self.pt.getPartitions()): for offset in xrange(self.pt.getPartitions()):
p = Packets.AskTIDsFrom(start, stop, limit, [offset]) p = Packets.AskTIDsFrom(start, stop, limit, offset)
for node, conn in self.cp.iterateForObject(offset, readable=True): for node, conn in self.cp.iterateForObject(offset, readable=True):
try: try:
r = self._askStorage(conn, p) r = self._askStorage(conn, p)
......
...@@ -90,3 +90,8 @@ class ConfigurationManager(object): ...@@ -90,3 +90,8 @@ class ConfigurationManager(object):
# only from command line # only from command line
return util.bin(self.argument_list.get('uuid', None)) return util.bin(self.argument_list.get('uuid', None))
def getUpstreamCluster(self):
return self.__get('upstream_cluster', True)
def getUpstreamMasters(self):
return util.parseMasterList(self.__get('upstream_masters'))
...@@ -79,6 +79,9 @@ class EpollEventManager(object): ...@@ -79,6 +79,9 @@ class EpollEventManager(object):
self.epoll.unregister(fd) self.epoll.unregister(fd)
del self.connection_dict[fd] del self.connection_dict[fd]
def isIdle(self):
return not (self._pending_processing or self.writer_set)
def _addPendingConnection(self, conn): def _addPendingConnection(self, conn):
pending_processing = self._pending_processing pending_processing = self._pending_processing
if conn not in pending_processing: if conn not in pending_processing:
......
...@@ -48,6 +48,7 @@ class ErrorCodes(Enum): ...@@ -48,6 +48,7 @@ class ErrorCodes(Enum):
PROTOCOL_ERROR = Enum.Item(4) PROTOCOL_ERROR = Enum.Item(4)
BROKEN_NODE = Enum.Item(5) BROKEN_NODE = Enum.Item(5)
ALREADY_PENDING = Enum.Item(7) ALREADY_PENDING = Enum.Item(7)
REPLICATION_ERROR = Enum.Item(8)
ErrorCodes = ErrorCodes() ErrorCodes = ErrorCodes()
class ClusterStates(Enum): class ClusterStates(Enum):
...@@ -55,6 +56,9 @@ class ClusterStates(Enum): ...@@ -55,6 +56,9 @@ class ClusterStates(Enum):
VERIFYING = Enum.Item(2) VERIFYING = Enum.Item(2)
RUNNING = Enum.Item(3) RUNNING = Enum.Item(3)
STOPPING = Enum.Item(4) STOPPING = Enum.Item(4)
STARTING_BACKUP = Enum.Item(5)
BACKINGUP = Enum.Item(6)
STOPPING_BACKUP = Enum.Item(7)
ClusterStates = ClusterStates() ClusterStates = ClusterStates()
class NodeTypes(Enum): class NodeTypes(Enum):
...@@ -117,6 +121,7 @@ ZERO_TID = '\0' * 8 ...@@ -117,6 +121,7 @@ ZERO_TID = '\0' * 8
ZERO_OID = '\0' * 8 ZERO_OID = '\0' * 8
OID_LEN = len(INVALID_OID) OID_LEN = len(INVALID_OID)
TID_LEN = len(INVALID_TID) TID_LEN = len(INVALID_TID)
MAX_TID = '\x7f' + '\xff' * 7 # SQLite does not accept numbers above 2^63-1
UUID_NAMESPACES = { UUID_NAMESPACES = {
NodeTypes.STORAGE: 'S', NodeTypes.STORAGE: 'S',
...@@ -723,6 +728,7 @@ class LastIDs(Packet): ...@@ -723,6 +728,7 @@ class LastIDs(Packet):
POID('last_oid'), POID('last_oid'),
PTID('last_tid'), PTID('last_tid'),
PPTID('last_ptid'), PPTID('last_ptid'),
PTID('backup_tid'),
) )
class PartitionTable(Packet): class PartitionTable(Packet):
...@@ -760,16 +766,6 @@ class PartitionChanges(Packet): ...@@ -760,16 +766,6 @@ class PartitionChanges(Packet):
), ),
) )
class ReplicationDone(Packet):
"""
Notify the master node that a partition has been successully replicated from
a storage to another.
S -> M
"""
_fmt = PStruct('notify_replication_done',
PNumber('offset'),
)
class StartOperation(Packet): class StartOperation(Packet):
""" """
Tell a storage nodes to start an operation. Until a storage node receives Tell a storage nodes to start an operation. Until a storage node receives
...@@ -965,7 +961,7 @@ class GetObject(Packet): ...@@ -965,7 +961,7 @@ class GetObject(Packet):
""" """
Ask a stored object by its OID and a serial or a TID if given. If a serial Ask a stored object by its OID and a serial or a TID if given. If a serial
is specified, the specified revision of an object will be returned. If is specified, the specified revision of an object will be returned. If
a TID is specified, an object right before the TID will be returned. S,C -> S. a TID is specified, an object right before the TID will be returned. C -> S.
Answer the requested object. S -> C. Answer the requested object. S -> C.
""" """
_fmt = PStruct('ask_object', _fmt = PStruct('ask_object',
...@@ -1003,16 +999,14 @@ class TIDList(Packet): ...@@ -1003,16 +999,14 @@ class TIDList(Packet):
class TIDListFrom(Packet): class TIDListFrom(Packet):
""" """
Ask for length TIDs starting at min_tid. The order of TIDs is ascending. Ask for length TIDs starting at min_tid. The order of TIDs is ascending.
S -> S. C -> S.
Answer the requested TIDs. S -> S Answer the requested TIDs. S -> C
""" """
_fmt = PStruct('tid_list_from', _fmt = PStruct('tid_list_from',
PTID('min_tid'), PTID('min_tid'),
PTID('max_tid'), PTID('max_tid'),
PNumber('length'), PNumber('length'),
PList('partition_list', PNumber('partition'),
PNumber('partition'),
),
) )
_answer = PStruct('answer_tids', _answer = PStruct('answer_tids',
...@@ -1054,27 +1048,6 @@ class ObjectHistory(Packet): ...@@ -1054,27 +1048,6 @@ class ObjectHistory(Packet):
PFHistoryList, PFHistoryList,
) )
class ObjectHistoryFrom(Packet):
"""
Ask history information for a given object. The order of serials is
ascending, and starts at (or above) min_serial for min_oid. S -> S.
Answer the requested serials. S -> S.
"""
_fmt = PStruct('ask_object_history',
POID('min_oid'),
PTID('min_serial'),
PTID('max_serial'),
PNumber('length'),
PNumber('partition'),
)
_answer = PStruct('ask_finish_transaction',
PDict('object_dict',
POID('oid'),
PFTidList,
),
)
class PartitionList(Packet): class PartitionList(Packet):
""" """
All the following messages are for neoctl to admin node All the following messages are for neoctl to admin node
...@@ -1341,6 +1314,110 @@ class NotifyReady(Packet): ...@@ -1341,6 +1314,110 @@ class NotifyReady(Packet):
""" """
pass pass
# replication
class FetchTransactions(Packet):
"""
S -> S
"""
_fmt = PStruct('ask_transaction_list',
PNumber('partition'),
PNumber('length'),
PTID('min_tid'),
PTID('max_tid'),
PFTidList, # already known transactions
)
_answer = PStruct('answer_transaction_list',
PTID('pack_tid'),
PTID('next_tid'),
PFTidList, # transactions to delete
)
class AddTransaction(Packet):
"""
S -> S
"""
_fmt = PStruct('add_transaction',
PTID('tid'),
PString('user'),
PString('description'),
PString('extension'),
PBoolean('packed'),
PFOidList,
)
class FetchObjects(Packet):
"""
S -> S
"""
_fmt = PStruct('ask_object_list',
PNumber('partition'),
PNumber('length'),
PTID('min_tid'),
PTID('max_tid'),
POID('min_oid'),
PDict('object_dict', # already known objects
PTID('serial'),
PFOidList,
),
)
_answer = PStruct('answer_object_list',
PTID('pack_tid'),
PTID('next_tid'),
POID('next_oid'),
PDict('object_dict', # objects to delete
PTID('serial'),
PFOidList,
),
)
class AddObject(Packet):
"""
S -> S
"""
_fmt = PStruct('add_object',
POID('oid'),
PTID('serial'),
PBoolean('compression'),
PChecksum('checksum'),
PString('data'),
PTID('data_serial'),
)
class Replicate(Packet):
"""
M -> S
"""
_fmt = PStruct('replicate',
PTID('tid'),
PString('upstream_name'),
PDict('source_dict',
PNumber('partition'),
PAddress('address'),
)
)
class ReplicationDone(Packet):
"""
Notify the master node that a partition has been successully replicated from
a storage to another.
S -> M
"""
_fmt = PStruct('notify_replication_done',
PNumber('offset'),
PTID('tid'),
)
class Truncate(Packet):
"""
M -> S
"""
_fmt = PStruct('ask_truncate',
PTID('tid'),
)
_answer = PFEmpty
StaticRegistry = {} StaticRegistry = {}
def register(request, ignore_when_closed=None): def register(request, ignore_when_closed=None):
""" Register a packet in the packet registry """ """ Register a packet in the packet registry """
...@@ -1516,16 +1593,12 @@ class Packets(dict): ...@@ -1516,16 +1593,12 @@ class Packets(dict):
ClusterState) ClusterState)
NotifyLastOID = register( NotifyLastOID = register(
NotifyLastOID) NotifyLastOID)
NotifyReplicationDone = register(
ReplicationDone)
AskObjectUndoSerial, AnswerObjectUndoSerial = register( AskObjectUndoSerial, AnswerObjectUndoSerial = register(
ObjectUndoSerial) ObjectUndoSerial)
AskHasLock, AnswerHasLock = register( AskHasLock, AnswerHasLock = register(
HasLock) HasLock)
AskTIDsFrom, AnswerTIDsFrom = register( AskTIDsFrom, AnswerTIDsFrom = register(
TIDListFrom) TIDListFrom)
AskObjectHistoryFrom, AnswerObjectHistoryFrom = register(
ObjectHistoryFrom)
AskPack, AnswerPack = register( AskPack, AnswerPack = register(
Pack, ignore_when_closed=False) Pack, ignore_when_closed=False)
AskCheckTIDRange, AnswerCheckTIDRange = register( AskCheckTIDRange, AnswerCheckTIDRange = register(
...@@ -1540,6 +1613,20 @@ class Packets(dict): ...@@ -1540,6 +1613,20 @@ class Packets(dict):
CheckCurrentSerial) CheckCurrentSerial)
NotifyTransactionFinished = register( NotifyTransactionFinished = register(
NotifyTransactionFinished) NotifyTransactionFinished)
Replicate = register(
Replicate)
NotifyReplicationDone = register(
ReplicationDone)
AskFetchTransactions, AnswerFetchTransactions = register(
FetchTransactions)
AskFetchObjects, AnswerFetchObjects = register(
FetchObjects)
AddTransaction = register(
AddTransaction)
AddObject = register(
AddObject)
AskTruncate, AnswerTruncate = register(
Truncate)
def Errors(): def Errors():
registry_dict = {} registry_dict = {}
......
...@@ -150,6 +150,11 @@ class PartitionTable(object): ...@@ -150,6 +150,11 @@ class PartitionTable(object):
return True return True
return False return False
def getCell(self, offset, uuid):
for cell in self.partition_list[offset]:
if cell.getUUID() == uuid:
return cell
def setCell(self, offset, node, state): def setCell(self, offset, node, state):
if state == CellStates.DISCARDED: if state == CellStates.DISCARDED:
return self.removeCell(offset, node) return self.removeCell(offset, node)
...@@ -157,28 +162,19 @@ class PartitionTable(object): ...@@ -157,28 +162,19 @@ class PartitionTable(object):
raise PartitionTableException('Invalid node state') raise PartitionTableException('Invalid node state')
self.count_dict.setdefault(node, 0) self.count_dict.setdefault(node, 0)
row = self.partition_list[offset] for cell in self.partition_list[offset]:
if len(row) == 0: if cell.getNode() is node:
# Create a new row. if not cell.isFeeding():
row = [Cell(node, state), ] self.count_dict[node] -= 1
if state != CellStates.FEEDING: cell.setState(state)
self.count_dict[node] += 1 break
self.partition_list[offset] = row
self.num_filled_rows += 1
else: else:
# XXX this can be slow, but it is necessary to remove a duplicate, row = self.partition_list[offset]
# if any. self.num_filled_rows += not row
for cell in row:
if cell.getNode() == node:
row.remove(cell)
if not cell.isFeeding():
self.count_dict[node] -= 1
break
row.append(Cell(node, state)) row.append(Cell(node, state))
if state != CellStates.FEEDING: if state != CellStates.FEEDING:
self.count_dict[node] += 1 self.count_dict[node] += 1
return (offset, node.getUUID(), state) return offset, node.getUUID(), state
def removeCell(self, offset, node): def removeCell(self, offset, node):
row = self.partition_list[offset] row = self.partition_list[offset]
......
...@@ -28,6 +28,10 @@ from neo.lib.event import EventManager ...@@ -28,6 +28,10 @@ from neo.lib.event import EventManager
from neo.lib.connection import ListeningConnection, ClientConnection from neo.lib.connection import ListeningConnection, ClientConnection
from neo.lib.exception import ElectionFailure, PrimaryFailure, OperationFailure from neo.lib.exception import ElectionFailure, PrimaryFailure, OperationFailure
from neo.lib.util import dump from neo.lib.util import dump
class StateChangedException(Exception): pass
from .backup_app import BackupApplication
from .handlers import election, identification, secondary from .handlers import election, identification, secondary
from .handlers import administration, client, storage, shutdown from .handlers import administration, client, storage, shutdown
from .pt import PartitionTable from .pt import PartitionTable
...@@ -41,6 +45,8 @@ class Application(object): ...@@ -41,6 +45,8 @@ class Application(object):
packing = None packing = None
# Latest completely commited TID # Latest completely commited TID
last_transaction = ZERO_TID last_transaction = ZERO_TID
backup_tid = None
backup_app = None
def __init__(self, config): def __init__(self, config):
# Internal attributes. # Internal attributes.
...@@ -90,16 +96,29 @@ class Application(object): ...@@ -90,16 +96,29 @@ class Application(object):
self._current_manager = None self._current_manager = None
# backup
upstream_cluster = config.getUpstreamCluster()
if upstream_cluster:
if upstream_cluster == self.name:
raise ValueError("upstream cluster name must be"
" different from cluster name")
self.backup_app = BackupApplication(self, upstream_cluster,
*config.getUpstreamMasters())
registerLiveDebugger(on_log=self.log) registerLiveDebugger(on_log=self.log)
def close(self): def close(self):
self.listening_conn = None self.listening_conn = None
if self.backup_app is not None:
self.backup_app.close()
self.nm.close() self.nm.close()
self.em.close() self.em.close()
del self.__dict__ del self.__dict__
def log(self): def log(self):
self.em.log() self.em.log()
if self.backup_app is not None:
self.backup_app.log()
self.nm.log() self.nm.log()
self.tm.log() self.tm.log()
if self.pt is not None: if self.pt is not None:
...@@ -257,27 +276,29 @@ class Application(object): ...@@ -257,27 +276,29 @@ class Application(object):
a shutdown. a shutdown.
""" """
neo.lib.logging.info('provide service') neo.lib.logging.info('provide service')
em = self.em poll = self.em.poll
self.tm.reset() self.tm.reset()
self.changeClusterState(ClusterStates.RUNNING) self.changeClusterState(ClusterStates.RUNNING)
# Now everything is passive. # Now everything is passive.
while True: try:
try: while True:
em.poll(1) poll(1)
except OperationFailure: except OperationFailure:
# If not operational, send Stop Operation packets to storage # If not operational, send Stop Operation packets to storage
# nodes and client nodes. Abort connections to client nodes. # nodes and client nodes. Abort connections to client nodes.
neo.lib.logging.critical('No longer operational') neo.lib.logging.critical('No longer operational')
for node in self.nm.getIdentifiedList(): except StateChangedException, e:
if node.isStorage() or node.isClient(): assert e.args[0] == ClusterStates.STARTING_BACKUP
node.notify(Packets.StopOperation()) self.backup_tid = tid = self.getLastTransaction()
if node.isClient(): self.pt.setBackupTidDict(dict((node.getUUID(), tid)
node.getConnection().abort() for node in self.nm.getStorageList(only_identified=True)))
for node in self.nm.getIdentifiedList():
# Then, go back, and restart. if node.isStorage() or node.isClient():
return node.notify(Packets.StopOperation())
if node.isClient():
node.getConnection().abort()
def playPrimaryRole(self): def playPrimaryRole(self):
neo.lib.logging.info( neo.lib.logging.info(
...@@ -314,7 +335,13 @@ class Application(object): ...@@ -314,7 +335,13 @@ class Application(object):
self.runManager(RecoveryManager) self.runManager(RecoveryManager)
while True: while True:
self.runManager(VerificationManager) self.runManager(VerificationManager)
self.provideService() if self.backup_tid:
if self.backup_app is None:
raise RuntimeError("No upstream cluster to backup"
" defined in configuration")
self.backup_app.provideService()
else:
self.provideService()
def playSecondaryRole(self): def playSecondaryRole(self):
""" """
...@@ -364,7 +391,8 @@ class Application(object): ...@@ -364,7 +391,8 @@ class Application(object):
# select the storage handler # select the storage handler
client_handler = client.ClientServiceHandler(self) client_handler = client.ClientServiceHandler(self)
if state == ClusterStates.RUNNING: if state in (ClusterStates.RUNNING, ClusterStates.STARTING_BACKUP,
ClusterStates.BACKINGUP, ClusterStates.STOPPING_BACKUP):
storage_handler = storage.StorageServiceHandler(self) storage_handler = storage.StorageServiceHandler(self)
elif self._current_manager is not None: elif self._current_manager is not None:
storage_handler = self._current_manager.getHandler() storage_handler = self._current_manager.getHandler()
...@@ -389,8 +417,9 @@ class Application(object): ...@@ -389,8 +417,9 @@ class Application(object):
handler = storage_handler handler = storage_handler
else: else:
continue # keep handler continue # keep handler
conn.setHandler(handler) if type(handler) is not type(conn.getLastHandler()):
handler.connectionCompleted(conn) conn.setHandler(handler)
handler.connectionCompleted(conn)
self.cluster_state = state self.cluster_state = state
def getNewUUID(self, node_type): def getNewUUID(self, node_type):
...@@ -437,19 +466,13 @@ class Application(object): ...@@ -437,19 +466,13 @@ class Application(object):
sys.exit() sys.exit()
def identifyStorageNode(self, uuid, node): def identifyStorageNode(self, uuid, node):
state = NodeStates.RUNNING if self.cluster_state == ClusterStates.STOPPING:
handler = None
if self.cluster_state == ClusterStates.RUNNING:
if uuid is None or node is None:
# same as for verification
state = NodeStates.PENDING
handler = storage.StorageServiceHandler(self)
elif self.cluster_state == ClusterStates.STOPPING:
raise NotReadyError raise NotReadyError
else: state = NodeStates.RUNNING
raise RuntimeError('unhandled cluster state: %s' % if uuid is None or node is None:
(self.cluster_state, )) # same as for verification
return (uuid, state, handler) state = NodeStates.PENDING
return uuid, state, storage.StorageServiceHandler(self)
def identifyNode(self, node_type, uuid, node): def identifyNode(self, node_type, uuid, node):
......
This diff is collapsed.
...@@ -18,15 +18,18 @@ ...@@ -18,15 +18,18 @@
import neo import neo
from . import MasterHandler from . import MasterHandler
from ..app import StateChangedException
from neo.lib.protocol import ClusterStates, NodeStates, Packets, ProtocolError from neo.lib.protocol import ClusterStates, NodeStates, Packets, ProtocolError
from neo.lib.protocol import Errors from neo.lib.protocol import Errors
from neo.lib.util import dump from neo.lib.util import dump
CLUSTER_STATE_WORKFLOW = { CLUSTER_STATE_WORKFLOW = {
# destination: sources # destination: sources
ClusterStates.VERIFYING: set([ClusterStates.RECOVERING]), ClusterStates.VERIFYING: (ClusterStates.RECOVERING,),
ClusterStates.STOPPING: set([ClusterStates.RECOVERING, ClusterStates.STARTING_BACKUP: (ClusterStates.RUNNING,
ClusterStates.VERIFYING, ClusterStates.RUNNING]), ClusterStates.STOPPING_BACKUP),
ClusterStates.STOPPING_BACKUP: (ClusterStates.BACKINGUP,
ClusterStates.STARTING_BACKUP),
} }
class AdministrationHandler(MasterHandler): class AdministrationHandler(MasterHandler):
...@@ -42,16 +45,17 @@ class AdministrationHandler(MasterHandler): ...@@ -42,16 +45,17 @@ class AdministrationHandler(MasterHandler):
conn.answer(Packets.AnswerPrimary(app.uuid, [])) conn.answer(Packets.AnswerPrimary(app.uuid, []))
def setClusterState(self, conn, state): def setClusterState(self, conn, state):
app = self.app
# check request # check request
if state not in CLUSTER_STATE_WORKFLOW: try:
if app.cluster_state not in CLUSTER_STATE_WORKFLOW[state]:
raise ProtocolError('Can not switch to this state')
except KeyError:
raise ProtocolError('Invalid state requested') raise ProtocolError('Invalid state requested')
valid_current_states = CLUSTER_STATE_WORKFLOW[state]
if self.app.cluster_state not in valid_current_states:
raise ProtocolError('Cannot switch to this state')
# change state # change state
if state == ClusterStates.VERIFYING: if state == ClusterStates.VERIFYING:
storage_list = self.app.nm.getStorageList(only_identified=True) storage_list = app.nm.getStorageList(only_identified=True)
if not storage_list: if not storage_list:
raise ProtocolError('Cannot exit recovery without any ' raise ProtocolError('Cannot exit recovery without any '
'storage node') 'storage node')
...@@ -60,15 +64,18 @@ class AdministrationHandler(MasterHandler): ...@@ -60,15 +64,18 @@ class AdministrationHandler(MasterHandler):
if node.getConnection().isPending(): if node.getConnection().isPending():
raise ProtocolError('Cannot exit recovery now: node %r is ' raise ProtocolError('Cannot exit recovery now: node %r is '
'entering cluster' % (node, )) 'entering cluster' % (node, ))
self.app._startup_allowed = True app._startup_allowed = True
else: state = app.cluster_state
self.app.changeClusterState(state) elif state == ClusterStates.STARTING_BACKUP:
if app.tm.hasPending() or app.nm.getClientList(True):
raise ProtocolError("Can not switch to %s state with pending"
" transactions or connected clients" % state)
elif state != ClusterStates.STOPPING_BACKUP:
app.changeClusterState(state)
# answer
conn.answer(Errors.Ack('Cluster state changed')) conn.answer(Errors.Ack('Cluster state changed'))
if state == ClusterStates.STOPPING: if state != app.cluster_state:
self.app.cluster_state = state raise StateChangedException(state)
self.app.shutdown()
def setNodeState(self, conn, uuid, state, modify_partition_table): def setNodeState(self, conn, uuid, state, modify_partition_table):
neo.lib.logging.info("set node state for %s-%s : %s" % neo.lib.logging.info("set node state for %s-%s : %s" %
......
##############################################################################
#
# Copyright (c) 2011 Nexedi SARL and Contributors. All Rights Reserved.
# Julien Muchembled <jm@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################
from neo.lib.exception import PrimaryFailure
from neo.lib.handler import EventHandler
from neo.lib.protocol import CellStates
class BackupHandler(EventHandler):
"""Handler dedicated to upstream master during BACKINGUP state"""
def connectionLost(self, conn, new_state):
if self.app.app.listening_conn: # if running
raise PrimaryFailure('connection lost')
def answerPartitionTable(self, conn, ptid, row_list):
self.app.pt.load(ptid, row_list, self.app.nm)
def notifyPartitionChanges(self, conn, ptid, cell_list):
self.app.pt.update(ptid, cell_list, self.app.nm)
def answerNodeInformation(self, conn):
pass
def notifyNodeInformation(self, conn, node_list):
self.app.nm.update(node_list)
def answerLastTransaction(self, conn, tid):
app = self.app
app.invalidatePartitions(tid, set(xrange(app.pt.getPartitions())))
def invalidateObjects(self, conn, tid, oid_list):
app = self.app
getPartition = app.app.pt.getPartition
partition_set = set(map(getPartition, oid_list))
partition_set.add(getPartition(tid))
app.invalidatePartitions(tid, partition_set)
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import neo.lib import neo.lib
from neo.lib.protocol import Packets, ProtocolError from neo.lib.protocol import ClusterStates, Packets, ProtocolError
from neo.lib.exception import OperationFailure from neo.lib.exception import OperationFailure
from neo.lib.util import dump from neo.lib.util import dump
from neo.lib.connector import ConnectorConnectionClosedException from neo.lib.connector import ConnectorConnectionClosedException
...@@ -45,14 +45,18 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -45,14 +45,18 @@ class StorageServiceHandler(BaseServiceHandler):
if not app.pt.operational(): if not app.pt.operational():
raise OperationFailure, 'cannot continue operation' raise OperationFailure, 'cannot continue operation'
app.tm.forget(conn.getUUID()) app.tm.forget(conn.getUUID())
if app.getClusterState() == ClusterStates.BACKINGUP:
app.backup_app.nodeLost(node)
if app.packing is not None: if app.packing is not None:
self.answerPack(conn, False) self.answerPack(conn, False)
def askLastIDs(self, conn): def askLastIDs(self, conn):
app = self.app app = self.app
loid = app.tm.getLastOID() conn.answer(Packets.AnswerLastIDs(
ltid = app.tm.getLastTID() app.tm.getLastOID(),
conn.answer(Packets.AnswerLastIDs(loid, ltid, app.pt.getID())) app.tm.getLastTID(),
app.pt.getID(),
app.backup_tid))
def askUnfinishedTransactions(self, conn): def askUnfinishedTransactions(self, conn):
tm = self.app.tm tm = self.app.tm
...@@ -68,15 +72,26 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -68,15 +72,26 @@ class StorageServiceHandler(BaseServiceHandler):
# transaction locked on this storage node # transaction locked on this storage node
self.app.tm.lock(ttid, conn.getUUID()) self.app.tm.lock(ttid, conn.getUUID())
def notifyReplicationDone(self, conn, offset): def notifyReplicationDone(self, conn, offset, tid):
node = self.app.nm.getByUUID(conn.getUUID()) app = self.app
neo.lib.logging.debug("%s is up for offset %s" % (node, offset)) node = app.nm.getByUUID(conn.getUUID())
try: if app.backup_tid:
cell_list = self.app.pt.setUpToDate(node, offset) cell_list = app.backup_app.notifyReplicationDone(node, offset, tid)
except PartitionTableException, e: if not cell_list:
raise ProtocolError(str(e)) return
else:
try:
cell_list = self.app.pt.setUpToDate(node, offset)
if not cell_list:
raise ProtocolError('Non-oudated partition')
except PartitionTableException, e:
raise ProtocolError(str(e))
neo.lib.logging.debug("%s is up for offset %s", node, offset)
self.app.broadcastPartitionChanges(cell_list) self.app.broadcastPartitionChanges(cell_list)
def answerTruncate(self, conn):
pass
def answerPack(self, conn, status): def answerPack(self, conn, status):
app = self.app app = self.app
if app.packing is not None: if app.packing is not None:
......
...@@ -17,11 +17,25 @@ ...@@ -17,11 +17,25 @@
import neo.lib.pt import neo.lib.pt
from struct import pack, unpack from struct import pack, unpack
from neo.lib.protocol import CellStates from neo.lib.protocol import CellStates, ZERO_TID
from neo.lib.pt import PartitionTableException
from neo.lib.pt import PartitionTable
class PartitionTable(PartitionTable):
class Cell(neo.lib.pt.Cell):
replicating = ZERO_TID
def setState(self, state):
try:
if CellStates.OUT_OF_DATE == state != self.state:
del self.backup_tid, self.replicating
except AttributeError:
pass
return super(Cell, self).setState(state)
neo.lib.pt.Cell = Cell
class PartitionTable(neo.lib.pt.PartitionTable):
"""This class manages a partition table for the primary master node""" """This class manages a partition table for the primary master node"""
def setID(self, id): def setID(self, id):
...@@ -54,7 +68,7 @@ class PartitionTable(PartitionTable): ...@@ -54,7 +68,7 @@ class PartitionTable(PartitionTable):
row = [] row = []
for _ in xrange(repeats): for _ in xrange(repeats):
node = node_list[index] node = node_list[index]
row.append(neo.lib.pt.Cell(node)) row.append(Cell(node))
self.count_dict[node] = self.count_dict.get(node, 0) + 1 self.count_dict[node] = self.count_dict.get(node, 0) + 1
index += 1 index += 1
if index == len(node_list): if index == len(node_list):
...@@ -88,7 +102,7 @@ class PartitionTable(PartitionTable): ...@@ -88,7 +102,7 @@ class PartitionTable(PartitionTable):
node_list = [c.getNode() for c in row] node_list = [c.getNode() for c in row]
n = self.findLeastUsedNode(node_list) n = self.findLeastUsedNode(node_list)
if n is not None: if n is not None:
row.append(neo.lib.pt.Cell(n, row.append(Cell(n,
CellStates.OUT_OF_DATE)) CellStates.OUT_OF_DATE))
self.count_dict[n] += 1 self.count_dict[n] += 1
cell_list.append((offset, n.getUUID(), cell_list.append((offset, n.getUUID(),
...@@ -132,11 +146,11 @@ class PartitionTable(PartitionTable): ...@@ -132,11 +146,11 @@ class PartitionTable(PartitionTable):
# check the partition is assigned and known as outdated # check the partition is assigned and known as outdated
for cell in self.getCellList(offset): for cell in self.getCellList(offset):
if cell.getUUID() == uuid: if cell.getUUID() == uuid:
if not cell.isOutOfDate(): if cell.isOutOfDate():
raise PartitionTableException('Non-oudated partition') break
break return
else: else:
raise PartitionTableException('Non-assigned partition') raise neo.lib.pt.PartitionTableException('Non-assigned partition')
# update the partition table # update the partition table
cell_list = [self.setCell(offset, node, CellStates.UP_TO_DATE)] cell_list = [self.setCell(offset, node, CellStates.UP_TO_DATE)]
...@@ -177,7 +191,7 @@ class PartitionTable(PartitionTable): ...@@ -177,7 +191,7 @@ class PartitionTable(PartitionTable):
else: else:
if num_cells <= self.nr: if num_cells <= self.nr:
row.append(neo.lib.pt.Cell(node, CellStates.OUT_OF_DATE)) row.append(Cell(node, CellStates.OUT_OF_DATE))
cell_list.append((offset, node.getUUID(), cell_list.append((offset, node.getUUID(),
CellStates.OUT_OF_DATE)) CellStates.OUT_OF_DATE))
node_count += 1 node_count += 1
...@@ -196,7 +210,7 @@ class PartitionTable(PartitionTable): ...@@ -196,7 +210,7 @@ class PartitionTable(PartitionTable):
CellStates.FEEDING)) CellStates.FEEDING))
# Don't count a feeding cell. # Don't count a feeding cell.
self.count_dict[max_cell.getNode()] -= 1 self.count_dict[max_cell.getNode()] -= 1
row.append(neo.lib.pt.Cell(node, CellStates.OUT_OF_DATE)) row.append(Cell(node, CellStates.OUT_OF_DATE))
cell_list.append((offset, node.getUUID(), cell_list.append((offset, node.getUUID(),
CellStates.OUT_OF_DATE)) CellStates.OUT_OF_DATE))
node_count += 1 node_count += 1
...@@ -277,7 +291,7 @@ class PartitionTable(PartitionTable): ...@@ -277,7 +291,7 @@ class PartitionTable(PartitionTable):
node = self.findLeastUsedNode([cell.getNode() for cell in row]) node = self.findLeastUsedNode([cell.getNode() for cell in row])
if node is None: if node is None:
break break
row.append(neo.lib.pt.Cell(node, CellStates.OUT_OF_DATE)) row.append(Cell(node, CellStates.OUT_OF_DATE))
changed_cell_list.append((offset, node.getUUID(), changed_cell_list.append((offset, node.getUUID(),
CellStates.OUT_OF_DATE)) CellStates.OUT_OF_DATE))
self.count_dict[node] += 1 self.count_dict[node] += 1
...@@ -309,6 +323,13 @@ class PartitionTable(PartitionTable): ...@@ -309,6 +323,13 @@ class PartitionTable(PartitionTable):
CellStates.OUT_OF_DATE)) CellStates.OUT_OF_DATE))
return change_list return change_list
def iterNodeCell(self, node):
for offset, row in enumerate(self.partition_list):
for cell in row:
if cell.getNode() is node:
yield offset, cell
break
def getUpToDateCellNodeSet(self): def getUpToDateCellNodeSet(self):
""" """
Return a set of all nodes which are part of at least one UP TO DATE Return a set of all nodes which are part of at least one UP TO DATE
...@@ -329,3 +350,16 @@ class PartitionTable(PartitionTable): ...@@ -329,3 +350,16 @@ class PartitionTable(PartitionTable):
for cell in row for cell in row
if cell.isOutOfDate()) if cell.isOutOfDate())
def setBackupTidDict(self, backup_tid_dict):
for row in self.partition_list:
for cell in row:
cell.backup_tid = backup_tid_dict.get(cell.getUUID(),
ZERO_TID)
def getBackupTid(self):
try:
return min(max(cell.backup_tid for cell in row
if not cell.isOutOfDate())
for row in self.partition_list)
except ValueError:
return ZERO_TID
...@@ -33,6 +33,7 @@ class RecoveryManager(MasterHandler): ...@@ -33,6 +33,7 @@ class RecoveryManager(MasterHandler):
super(RecoveryManager, self).__init__(app) super(RecoveryManager, self).__init__(app)
# The target node's uuid to request next. # The target node's uuid to request next.
self.target_ptid = None self.target_ptid = None
self.backup_tid_dict = {}
def getHandler(self): def getHandler(self):
return self return self
...@@ -98,6 +99,9 @@ class RecoveryManager(MasterHandler): ...@@ -98,6 +99,9 @@ class RecoveryManager(MasterHandler):
app.tm.setLastOID(ZERO_OID) app.tm.setLastOID(ZERO_OID)
pt.make(allowed_node_set) pt.make(allowed_node_set)
self._broadcastPartitionTable(pt.getID(), pt.getRowList()) self._broadcastPartitionTable(pt.getID(), pt.getRowList())
elif app.backup_tid:
pt.setBackupTidDict(self.backup_tid_dict)
app.backup_tid = pt.getBackupTid()
app.setLastTransaction(app.tm.getLastTID()) app.setLastTransaction(app.tm.getLastTID())
neo.lib.logging.debug( neo.lib.logging.debug(
...@@ -118,7 +122,7 @@ class RecoveryManager(MasterHandler): ...@@ -118,7 +122,7 @@ class RecoveryManager(MasterHandler):
# ask the last IDs to perform the recovery # ask the last IDs to perform the recovery
conn.ask(Packets.AskLastIDs()) conn.ask(Packets.AskLastIDs())
def answerLastIDs(self, conn, loid, ltid, lptid): def answerLastIDs(self, conn, loid, ltid, lptid, backup_tid):
# Get max values. # Get max values.
if loid is not None: if loid is not None:
self.app.tm.setLastOID(loid) self.app.tm.setLastOID(loid)
...@@ -128,6 +132,7 @@ class RecoveryManager(MasterHandler): ...@@ -128,6 +132,7 @@ class RecoveryManager(MasterHandler):
# something newer # something newer
self.target_ptid = lptid self.target_ptid = lptid
conn.ask(Packets.AskPartitionTable()) conn.ask(Packets.AskPartitionTable())
self.backup_tid_dict[conn.getUUID()] = backup_tid
def answerPartitionTable(self, conn, ptid, row_list): def answerPartitionTable(self, conn, ptid, row_list):
if ptid != self.target_ptid: if ptid != self.target_ptid:
...@@ -136,6 +141,7 @@ class RecoveryManager(MasterHandler): ...@@ -136,6 +141,7 @@ class RecoveryManager(MasterHandler):
dump(self.target_ptid)) dump(self.target_ptid))
else: else:
self._broadcastPartitionTable(ptid, row_list) self._broadcastPartitionTable(ptid, row_list)
self.app.backup_tid = self.backup_tid_dict[conn.getUUID()]
def _broadcastPartitionTable(self, ptid, row_list): def _broadcastPartitionTable(self, ptid, row_list):
try: try:
......
...@@ -113,19 +113,21 @@ class VerificationManager(BaseServiceHandler): ...@@ -113,19 +113,21 @@ class VerificationManager(BaseServiceHandler):
def verifyData(self): def verifyData(self):
"""Verify the data in storage nodes and clean them up, if necessary.""" """Verify the data in storage nodes and clean them up, if necessary."""
app = self.app
em, nm = self.app.em, self.app.nm
# wait for any missing node # wait for any missing node
neo.lib.logging.debug('waiting for the cluster to be operational') neo.lib.logging.debug('waiting for the cluster to be operational')
while not self.app.pt.operational(): while not app.pt.operational():
em.poll(1) app.em.poll(1)
if app.backup_tid:
return
neo.lib.logging.info('start to verify data') neo.lib.logging.info('start to verify data')
getIdentifiedList = app.nm.getIdentifiedList
# Gather all unfinished transactions. # Gather all unfinished transactions.
self._askStorageNodesAndWait(Packets.AskUnfinishedTransactions(), self._askStorageNodesAndWait(Packets.AskUnfinishedTransactions(),
[x for x in self.app.nm.getIdentifiedList() if x.isStorage()]) [x for x in getIdentifiedList() if x.isStorage()])
# Gather OIDs for each unfinished TID, and verify whether the # Gather OIDs for each unfinished TID, and verify whether the
# transaction can be finished or must be aborted. This could be # transaction can be finished or must be aborted. This could be
...@@ -136,17 +138,16 @@ class VerificationManager(BaseServiceHandler): ...@@ -136,17 +138,16 @@ class VerificationManager(BaseServiceHandler):
if uuid_set is None: if uuid_set is None:
packet = Packets.DeleteTransaction(tid, self._oid_set or []) packet = Packets.DeleteTransaction(tid, self._oid_set or [])
# Make sure that no node has this transaction. # Make sure that no node has this transaction.
for node in self.app.nm.getIdentifiedList(): for node in getIdentifiedList():
if node.isStorage(): if node.isStorage():
node.notify(packet) node.notify(packet)
else: else:
packet = Packets.CommitTransaction(tid) packet = Packets.CommitTransaction(tid)
for node in self.app.nm.getIdentifiedList(pool_set=uuid_set): for node in getIdentifiedList(pool_set=uuid_set):
node.notify(packet) node.notify(packet)
self._oid_set = set() self._oid_set = set()
# If possible, send the packets now. # If possible, send the packets now.
em.poll(0) app.em.poll(0)
def verifyTransaction(self, tid): def verifyTransaction(self, tid):
em = self.app.em em = self.app.em
...@@ -189,11 +190,11 @@ class VerificationManager(BaseServiceHandler): ...@@ -189,11 +190,11 @@ class VerificationManager(BaseServiceHandler):
return uuid_set return uuid_set
def answerLastIDs(self, conn, loid, ltid, lptid): def answerLastIDs(self, conn, loid, ltid, lptid, backup_tid):
# FIXME: this packet should not allowed here, the master already # FIXME: this packet should not allowed here, the master already
# accepted the current partition table end IDs. As there were manually # accepted the current partition table end IDs. As there were manually
# approved during recovery, there is no need to check them here. # approved during recovery, there is no need to check them here.
pass raise RuntimeError
def answerUnfinishedTransactions(self, conn, max_tid, tid_list): def answerUnfinishedTransactions(self, conn, max_tid, tid_list):
uuid = conn.getUUID() uuid = conn.getUUID()
......
...@@ -54,15 +54,10 @@ UNIT_TEST_MODULES = [ ...@@ -54,15 +54,10 @@ UNIT_TEST_MODULES = [
'neo.tests.storage.testInitializationHandler', 'neo.tests.storage.testInitializationHandler',
'neo.tests.storage.testMasterHandler', 'neo.tests.storage.testMasterHandler',
'neo.tests.storage.testStorageApp', 'neo.tests.storage.testStorageApp',
'neo.tests.storage.testStorageHandler', 'neo.tests.storage.testStorage' + os.getenv('NEO_TESTS_ADAPTER', 'SQLite'),
'neo.tests.storage.testStorageMySQLdb',
'neo.tests.storage.testStorageBTree',
'neo.tests.storage.testVerificationHandler', 'neo.tests.storage.testVerificationHandler',
'neo.tests.storage.testIdentificationHandler', 'neo.tests.storage.testIdentificationHandler',
'neo.tests.storage.testTransactions', 'neo.tests.storage.testTransactions',
'neo.tests.storage.testReplicationHandler',
'neo.tests.storage.testReplicator',
'neo.tests.storage.testReplication',
# client application # client application
'neo.tests.client.testClientApp', 'neo.tests.client.testClientApp',
'neo.tests.client.testMasterHandler', 'neo.tests.client.testMasterHandler',
...@@ -70,6 +65,7 @@ UNIT_TEST_MODULES = [ ...@@ -70,6 +65,7 @@ UNIT_TEST_MODULES = [
'neo.tests.client.testConnectionPool', 'neo.tests.client.testConnectionPool',
# light functional tests # light functional tests
'neo.tests.threaded.test', 'neo.tests.threaded.test',
'neo.tests.threaded.testReplication',
] ]
FUNC_TEST_MODULES = [ FUNC_TEST_MODULES = [
......
...@@ -113,28 +113,21 @@ class Application(object): ...@@ -113,28 +113,21 @@ class Application(object):
"""Load persistent configuration data from the database. """Load persistent configuration data from the database.
If data is not present, generate it.""" If data is not present, generate it."""
def NoneOnKeyError(getter):
try:
return getter()
except KeyError:
return None
dm = self.dm dm = self.dm
# check cluster name # check cluster name
try: name = dm.getName()
dm_name = dm.getName() if name is None:
except KeyError:
dm.setName(self.name) dm.setName(self.name)
else: elif name != self.name:
if dm_name != self.name: raise RuntimeError('name %r does not match with the database: %r'
raise RuntimeError('name %r does not match with the ' % (self.name, dm_name))
'database: %r' % (self.name, dm_name))
# load configuration # load configuration
self.uuid = NoneOnKeyError(dm.getUUID) self.uuid = dm.getUUID()
num_partitions = NoneOnKeyError(dm.getNumPartitions) num_partitions = dm.getNumPartitions()
num_replicas = NoneOnKeyError(dm.getNumReplicas) num_replicas = dm.getNumReplicas()
ptid = NoneOnKeyError(dm.getPTID) ptid = dm.getPTID()
# check partition table configuration # check partition table configuration
if num_partitions is not None and num_replicas is not None: if num_partitions is not None and num_replicas is not None:
...@@ -152,10 +145,7 @@ class Application(object): ...@@ -152,10 +145,7 @@ class Application(object):
def loadPartitionTable(self): def loadPartitionTable(self):
"""Load a partition table from the database.""" """Load a partition table from the database."""
try: ptid = self.dm.getPTID()
ptid = self.dm.getPTID()
except KeyError:
ptid = None
cell_list = self.dm.getPartitionTable() cell_list = self.dm.getPartitionTable()
new_cell_list = [] new_cell_list = []
for offset, uuid, state in cell_list: for offset, uuid, state in cell_list:
...@@ -216,9 +206,7 @@ class Application(object): ...@@ -216,9 +206,7 @@ class Application(object):
except OperationFailure, msg: except OperationFailure, msg:
neo.lib.logging.error('operation stopped: %s', msg) neo.lib.logging.error('operation stopped: %s', msg)
except PrimaryFailure, msg: except PrimaryFailure, msg:
self.replicator.masterLost()
neo.lib.logging.error('primary master is down: %s', msg) neo.lib.logging.error('primary master is down: %s', msg)
self.master_node = None
def connectToPrimary(self): def connectToPrimary(self):
"""Find a primary master node, and connect to it. """Find a primary master node, and connect to it.
...@@ -296,6 +284,7 @@ class Application(object): ...@@ -296,6 +284,7 @@ class Application(object):
neo.lib.logging.info('doing operation') neo.lib.logging.info('doing operation')
_poll = self._poll _poll = self._poll
isIdle = self.em.isIdle
handler = master.MasterOperationHandler(self) handler = master.MasterOperationHandler(self)
self.master_conn.setHandler(handler) self.master_conn.setHandler(handler)
...@@ -304,16 +293,21 @@ class Application(object): ...@@ -304,16 +293,21 @@ class Application(object):
self.dm.dropUnfinishedData() self.dm.dropUnfinishedData()
self.tm.reset() self.tm.reset()
while True: self.task_queue = task_queue = deque()
_poll() try:
if self.replicator.pending(): while True:
# Call processDelayedTasks before act, so tasks added in the while task_queue and isIdle():
# act call are executed after one poll call, so that sent try:
# packets are already on the network and delayed task task_queue[-1].next()
# processing happens in parallel with the same task on the task_queue.rotate()
# other storage node. except StopIteration:
self.replicator.processDelayedTasks() task_queue.pop()
self.replicator.act() _poll()
finally:
del self.task_queue
# Abort any replication, whether we are feeding or out-of-date.
for node in self.nm.getStorageList(only_identified=True):
node.getConnection().close()
def wait(self): def wait(self):
# change handler # change handler
...@@ -368,6 +362,13 @@ class Application(object): ...@@ -368,6 +362,13 @@ class Application(object):
neo.lib.logging.info(' %r:%r: %r:%r %r %r', key, event.__name__, neo.lib.logging.info(' %r:%r: %r:%r %r %r', key, event.__name__,
_msg_id, _conn, args) _msg_id, _conn, args)
def newTask(self, iterator):
try:
iterator.next()
except StopIteration:
return
self.task_queue.appendleft(iterator)
def shutdown(self, erase=False): def shutdown(self, erase=False):
"""Close all connections and exit""" """Close all connections and exit"""
for c in self.em.getConnectionList(): for c in self.em.getConnectionList():
......
...@@ -15,10 +15,13 @@ ...@@ -15,10 +15,13 @@
# along with this program; if not, write to the Free Software # along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
LOG_QUERIES = False
from neo.lib.exception import DatabaseFailure from neo.lib.exception import DatabaseFailure
from .manager import DatabaseManager from .manager import DatabaseManager
from .sqlite import SQLiteDatabaseManager
DATABASE_MANAGER_DICT = {} DATABASE_MANAGER_DICT = {'SQLite': SQLiteDatabaseManager}
try: try:
from .mysqldb import MySQLDatabaseManager from .mysqldb import MySQLDatabaseManager
...@@ -27,17 +30,6 @@ except ImportError: ...@@ -27,17 +30,6 @@ except ImportError:
else: else:
DATABASE_MANAGER_DICT['MySQL'] = MySQLDatabaseManager DATABASE_MANAGER_DICT['MySQL'] = MySQLDatabaseManager
try:
from .btree import BTreeDatabaseManager
except ImportError:
pass
else:
# XXX: warning: name might change in the future.
DATABASE_MANAGER_DICT['BTree'] = BTreeDatabaseManager
if not DATABASE_MANAGER_DICT:
raise ImportError('No database back-end available.')
def buildDatabaseManager(name, args=(), kw={}): def buildDatabaseManager(name, args=(), kw={}):
if name is None: if name is None:
name = DATABASE_MANAGER_DICT.keys()[0] name = DATABASE_MANAGER_DICT.keys()[0]
......
This diff is collapsed.
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
import neo.lib import neo.lib
from neo.lib import util from neo.lib import util
from neo.lib.exception import DatabaseFailure from neo.lib.exception import DatabaseFailure
from neo.lib.protocol import ZERO_TID
class CreationUndone(Exception): class CreationUndone(Exception):
pass pass
...@@ -37,34 +38,6 @@ class DatabaseManager(object): ...@@ -37,34 +38,6 @@ class DatabaseManager(object):
"""Called during instanciation, to process database parameter.""" """Called during instanciation, to process database parameter."""
pass pass
def isUnderTransaction(self):
return self._under_transaction
def begin(self):
"""
Begin a transaction
"""
if self._under_transaction:
raise DatabaseFailure('A transaction has already begun')
self._begin()
self._under_transaction = True
def commit(self):
"""
Commit the current transaction
"""
if not self._under_transaction:
raise DatabaseFailure('The transaction has not begun')
self._commit()
self._under_transaction = False
def rollback(self):
"""
Rollback the current transaction
"""
self._rollback()
self._under_transaction = False
def setup(self, reset = 0): def setup(self, reset = 0):
"""Set up a database """Set up a database
...@@ -79,14 +52,33 @@ class DatabaseManager(object): ...@@ -79,14 +52,33 @@ class DatabaseManager(object):
""" """
raise NotImplementedError raise NotImplementedError
def _begin(self): def __enter__(self):
raise NotImplementedError """
Begin a transaction
"""
if self._under_transaction:
raise DatabaseFailure('A transaction has already begun')
r = self.begin()
self._under_transaction = True
return r
def _commit(self): def __exit__(self, exc_type, exc_value, tb):
raise NotImplementedError if not self._under_transaction:
raise DatabaseFailure('The transaction has not begun')
self._under_transaction = False
if exc_type is None:
self.commit()
else:
self.rollback()
def _rollback(self): def begin(self):
raise NotImplementedError pass
def commit(self):
pass
def rollback(self):
pass
def _getPartition(self, oid_or_tid): def _getPartition(self, oid_or_tid):
return oid_or_tid % self.getNumPartitions() return oid_or_tid % self.getNumPartitions()
...@@ -104,13 +96,8 @@ class DatabaseManager(object): ...@@ -104,13 +96,8 @@ class DatabaseManager(object):
if self._under_transaction: if self._under_transaction:
self._setConfiguration(key, value) self._setConfiguration(key, value)
else: else:
self.begin() with self:
try:
self._setConfiguration(key, value) self._setConfiguration(key, value)
except:
self.rollback()
raise
self.commit()
def _setConfiguration(self, key, value): def _setConfiguration(self, key, value):
raise NotImplementedError raise NotImplementedError
...@@ -171,7 +158,9 @@ class DatabaseManager(object): ...@@ -171,7 +158,9 @@ class DatabaseManager(object):
""" """
Load a Partition Table ID from a database. Load a Partition Table ID from a database.
""" """
return long(self.getConfiguration('ptid')) ptid = self.getConfiguration('ptid')
if ptid is not None:
return long(ptid)
def setPTID(self, ptid): def setPTID(self, ptid):
""" """
...@@ -194,18 +183,31 @@ class DatabaseManager(object): ...@@ -194,18 +183,31 @@ class DatabaseManager(object):
""" """
self.setConfiguration('loid', util.dump(loid)) self.setConfiguration('loid', util.dump(loid))
def getBackupTID(self):
return util.bin(self.getConfiguration('backup_tid'))
def setBackupTID(self, backup_tid):
return self.setConfiguration('backup_tid', util.dump(backup_tid))
def getPartitionTable(self): def getPartitionTable(self):
"""Return a whole partition table as a tuple of rows. Each row """Return a whole partition table as a tuple of rows. Each row
is again a tuple of an offset (row ID), an UUID of a storage is again a tuple of an offset (row ID), an UUID of a storage
node, and a cell state.""" node, and a cell state."""
raise NotImplementedError raise NotImplementedError
def getLastTID(self, all = True): def _getLastTIDs(self, all=True):
"""Return the last TID in a database. If all is true,
unfinished transactions must be taken account into. If there
is no TID in the database, return None."""
raise NotImplementedError raise NotImplementedError
def getLastTIDs(self, all=True):
trans, obj = self._getLastTIDs()
if trans:
tid = max(trans.itervalues())
if obj:
tid = max(tid, max(obj.itervalues()))
else:
tid = max(obj.itervalues()) if obj else None
return tid, trans, obj
def getUnfinishedTIDList(self): def getUnfinishedTIDList(self):
"""Return a list of unfinished transaction's IDs.""" """Return a list of unfinished transaction's IDs."""
raise NotImplementedError raise NotImplementedError
...@@ -352,13 +354,8 @@ class DatabaseManager(object): ...@@ -352,13 +354,8 @@ class DatabaseManager(object):
else: else:
del refcount[data_id] del refcount[data_id]
if prune: if prune:
self.begin() with self:
try:
self._pruneData(data_id_list) self._pruneData(data_id_list)
except:
self.rollback()
raise
self.commit()
__getDataTID = set() __getDataTID = set()
def _getDataTID(self, oid, tid=None, before_tid=None): def _getDataTID(self, oid, tid=None, before_tid=None):
...@@ -466,23 +463,24 @@ class DatabaseManager(object): ...@@ -466,23 +463,24 @@ class DatabaseManager(object):
an oid list""" an oid list"""
raise NotImplementedError raise NotImplementedError
def deleteTransactionsAbove(self, partition, tid, max_tid):
"""Delete all transactions above given TID (inclued) in given
partition, but never above max_tid (in case transactions are committed
during replication)."""
raise NotImplementedError
def deleteObject(self, oid, serial=None): def deleteObject(self, oid, serial=None):
"""Delete given object. If serial is given, only delete that serial for """Delete given object. If serial is given, only delete that serial for
given oid.""" given oid."""
raise NotImplementedError raise NotImplementedError
def deleteObjectsAbove(self, partition, oid, serial, max_tid): def _deleteRange(self, partition, min_tid=None, max_tid=None):
"""Delete all objects above given OID and serial (inclued) in given """Delete all objects and transactions between given min_tid (excluded)
partition, but never above max_tid (in case objects are stored during and max_tid (included)"""
replication)"""
raise NotImplementedError raise NotImplementedError
def truncate(self, tid):
assert tid not in (None, ZERO_TID), tid
with self:
assert self.getBackupTID()
self.setBackupTID(tid)
for partition in xrange(self.getNumPartitions()):
self._deleteRange(partition, tid)
def getTransaction(self, tid, all = False): def getTransaction(self, tid, all = False):
"""Return a tuple of the list of OIDs, user information, """Return a tuple of the list of OIDs, user information,
a description, and extension information, for a given transaction a description, and extension information, for a given transaction
...@@ -498,10 +496,10 @@ class DatabaseManager(object): ...@@ -498,10 +496,10 @@ class DatabaseManager(object):
If there is no such object ID in a database, return None.""" If there is no such object ID in a database, return None."""
raise NotImplementedError raise NotImplementedError
def getObjectHistoryFrom(self, oid, min_serial, max_serial, length, def getReplicationObjectList(self, min_tid, max_tid, length, partition,
partition): min_oid):
"""Return a dict of length serials grouped by oid at (or above) """Return a dict of length oids grouped by serial at (or above)
min_oid and min_serial and below max_serial, for given partition, min_tid and min_oid and below max_tid, for given partition,
sorted in ascending order.""" sorted in ascending order."""
raise NotImplementedError raise NotImplementedError
......
This diff is collapsed.
This diff is collapsed.
...@@ -18,15 +18,15 @@ ...@@ -18,15 +18,15 @@
import neo import neo
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib import protocol
from neo.lib.util import dump from neo.lib.util import dump
from neo.lib.exception import PrimaryFailure, OperationFailure from neo.lib.exception import PrimaryFailure, OperationFailure
from neo.lib.protocol import NodeStates, NodeTypes, Packets, Errors, ZERO_HASH from neo.lib.protocol import NodeStates, NodeTypes
class BaseMasterHandler(EventHandler): class BaseMasterHandler(EventHandler):
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
if self.app.listening_conn: # if running if self.app.listening_conn: # if running
self.app.master_node = None
raise PrimaryFailure('connection lost') raise PrimaryFailure('connection lost')
def stopOperation(self, conn): def stopOperation(self, conn):
...@@ -62,44 +62,5 @@ class BaseMasterHandler(EventHandler): ...@@ -62,44 +62,5 @@ class BaseMasterHandler(EventHandler):
dump(uuid)) dump(uuid))
self.app.tm.abortFor(uuid) self.app.tm.abortFor(uuid)
def answerUnfinishedTransactions(self, conn, *args, **kw):
class BaseClientAndStorageOperationHandler(EventHandler): self.app.replicator.setUnfinishedTIDList(*args, **kw)
""" Accept requests common to client and storage nodes """
def askTransactionInformation(self, conn, tid):
app = self.app
t = app.dm.getTransaction(tid)
if t is None:
p = Errors.TidNotFound('%s does not exist' % dump(tid))
else:
p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3],
t[4], t[0])
conn.answer(p)
def _askObject(self, oid, serial, tid):
raise NotImplementedError
def askObject(self, conn, oid, serial, tid):
app = self.app
if self.app.tm.loadLocked(oid):
# Delay the response.
app.queueEvent(self.askObject, conn, (oid, serial, tid))
return
o = self._askObject(oid, serial, tid)
if o is None:
neo.lib.logging.debug('oid = %s does not exist', dump(oid))
p = Errors.OidDoesNotExist(dump(oid))
elif o is False:
neo.lib.logging.debug('oid = %s not found', dump(oid))
p = Errors.OidNotFound(dump(oid))
else:
serial, next_serial, compression, checksum, data, data_serial = o
neo.lib.logging.debug('oid = %s, serial = %s, next_serial = %s',
dump(oid), dump(serial), dump(next_serial))
if checksum is None:
checksum = ZERO_HASH
data = ''
p = Packets.AnswerObject(oid, serial, next_serial,
compression, checksum, data, data_serial)
conn.answer(p)
...@@ -16,10 +16,10 @@ ...@@ -16,10 +16,10 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import neo.lib import neo.lib
from neo.lib import protocol from neo.lib.handler import EventHandler
from neo.lib.util import dump, makeChecksum from neo.lib.util import dump, makeChecksum
from neo.lib.protocol import Packets, LockState, Errors, ZERO_HASH from neo.lib.protocol import Packets, LockState, Errors, ProtocolError, \
from . import BaseClientAndStorageOperationHandler ZERO_HASH, INVALID_PARTITION
from ..transactions import ConflictError, DelayedError from ..transactions import ConflictError, DelayedError
from ..exception import AlreadyPendingError from ..exception import AlreadyPendingError
import time import time
...@@ -28,10 +28,40 @@ import time ...@@ -28,10 +28,40 @@ import time
# Set to None to disable. # Set to None to disable.
SLOW_STORE = 2 SLOW_STORE = 2
class ClientOperationHandler(BaseClientAndStorageOperationHandler): class ClientOperationHandler(EventHandler):
def _askObject(self, oid, serial, ttid): def askTransactionInformation(self, conn, tid):
return self.app.dm.getObject(oid, serial, ttid) t = self.app.dm.getTransaction(tid)
if t is None:
p = Errors.TidNotFound('%s does not exist' % dump(tid))
else:
p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3],
t[4], t[0])
conn.answer(p)
def askObject(self, conn, oid, serial, tid):
app = self.app
if app.tm.loadLocked(oid):
# Delay the response.
app.queueEvent(self.askObject, conn, (oid, serial, tid))
return
o = app.dm.getObject(oid, serial, tid)
if o is None:
neo.lib.logging.debug('oid = %s does not exist', dump(oid))
p = Errors.OidDoesNotExist(dump(oid))
elif o is False:
neo.lib.logging.debug('oid = %s not found', dump(oid))
p = Errors.OidNotFound(dump(oid))
else:
serial, next_serial, compression, checksum, data, data_serial = o
neo.lib.logging.debug('oid = %s, serial = %s, next_serial = %s',
dump(oid), dump(serial), dump(next_serial))
if checksum is None:
checksum = ZERO_HASH
data = ''
p = Packets.AnswerObject(oid, serial, next_serial,
compression, checksum, data, data_serial)
conn.answer(p)
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
uuid = conn.getUUID() uuid = conn.getUUID()
...@@ -96,22 +126,18 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler): ...@@ -96,22 +126,18 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
self._askStoreObject(conn, oid, serial, compression, checksum, data, self._askStoreObject(conn, oid, serial, compression, checksum, data,
data_serial, ttid, unlock, time.time()) data_serial, ttid, unlock, time.time())
def askTIDsFrom(self, conn, min_tid, max_tid, length, partition_list): def askTIDsFrom(self, conn, min_tid, max_tid, length, partition):
getReplicationTIDList = self.app.dm.getReplicationTIDList conn.answer(Packets.AnswerTIDsFrom(self.app.dm.getReplicationTIDList(
tid_list = [] min_tid, max_tid, length, partition)))
extend = tid_list.extend
for partition in partition_list:
extend(getReplicationTIDList(min_tid, max_tid, length, partition))
conn.answer(Packets.AnswerTIDsFrom(tid_list))
def askTIDs(self, conn, first, last, partition): def askTIDs(self, conn, first, last, partition):
# This method is complicated, because I must return TIDs only # This method is complicated, because I must return TIDs only
# about usable partitions assigned to me. # about usable partitions assigned to me.
if first >= last: if first >= last:
raise protocol.ProtocolError('invalid offsets') raise ProtocolError('invalid offsets')
app = self.app app = self.app
if partition == protocol.INVALID_PARTITION: if partition == INVALID_PARTITION:
partition_list = app.pt.getAssignedPartitionList(app.uuid) partition_list = app.pt.getAssignedPartitionList(app.uuid)
else: else:
partition_list = [partition] partition_list = [partition]
...@@ -149,7 +175,7 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler): ...@@ -149,7 +175,7 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
def askObjectHistory(self, conn, oid, first, last): def askObjectHistory(self, conn, oid, first, last):
if first >= last: if first >= last:
raise protocol.ProtocolError( 'invalid offsets') raise ProtocolError('invalid offsets')
app = self.app app = self.app
history_list = app.dm.getObjectHistory(oid, first, last - first) history_list = app.dm.getObjectHistory(oid, first, last - first)
......
...@@ -21,6 +21,7 @@ from neo.lib.handler import EventHandler ...@@ -21,6 +21,7 @@ from neo.lib.handler import EventHandler
from neo.lib.protocol import NodeTypes, Packets, NotReadyError from neo.lib.protocol import NodeTypes, Packets, NotReadyError
from neo.lib.protocol import ProtocolError, BrokenNodeDisallowedError from neo.lib.protocol import ProtocolError, BrokenNodeDisallowedError
from neo.lib.util import dump from neo.lib.util import dump
from .storage import StorageOperationHandler
class IdentificationHandler(EventHandler): class IdentificationHandler(EventHandler):
""" Handler used for incoming connections during operation state """ """ Handler used for incoming connections during operation state """
...@@ -35,37 +36,42 @@ class IdentificationHandler(EventHandler): ...@@ -35,37 +36,42 @@ class IdentificationHandler(EventHandler):
if not self.app.ready: if not self.app.ready:
raise NotReadyError raise NotReadyError
app = self.app app = self.app
node = app.nm.getByUUID(uuid) if uuid is None:
# If this node is broken, reject it. if node_type != NodeTypes.STORAGE:
if node is not None and node.isBroken(): raise ProtocolError('reject anonymous non-storage node')
raise BrokenNodeDisallowedError handler = StorageOperationHandler(self.app)
# choose the handler according to the node type conn.setHandler(handler)
if node_type == NodeTypes.CLIENT:
from .client import ClientOperationHandler
handler = ClientOperationHandler
if node is None:
node = app.nm.createClient(uuid=uuid)
elif node.isConnected():
# cut previous connection
node.getConnection().close()
assert not node.isConnected()
node.setRunning()
elif node_type == NodeTypes.STORAGE:
from .storage import StorageOperationHandler
handler = StorageOperationHandler
if node is None:
neo.lib.logging.error('reject an unknown storage node %s',
dump(uuid))
raise NotReadyError
else: else:
raise ProtocolError('reject non-client-or-storage node') if uuid == app.uuid:
# apply the handler and set up the connection raise ProtocolError("uuid conflict or loopback connection")
handler = handler(self.app) node = app.nm.getByUUID(uuid)
conn.setHandler(handler) # If this node is broken, reject it.
node.setConnection(conn) if node is not None and node.isBroken():
args = (NodeTypes.STORAGE, app.uuid, app.pt.getPartitions(), raise BrokenNodeDisallowedError
app.pt.getReplicas(), uuid) # choose the handler according to the node type
if node_type == NodeTypes.CLIENT:
from .client import ClientOperationHandler
handler = ClientOperationHandler
if node is None:
node = app.nm.createClient(uuid=uuid)
elif node.isConnected():
# cut previous connection
node.getConnection().close()
assert not node.isConnected()
node.setRunning()
elif node_type == NodeTypes.STORAGE:
if node is None:
neo.lib.logging.error('reject an unknown storage node %s',
dump(uuid))
raise NotReadyError
handler = StorageOperationHandler
else:
raise ProtocolError('reject non-client-or-storage node')
# apply the handler and set up the connection
handler = handler(self.app)
conn.setHandler(handler)
node.setConnection(conn, app.uuid < uuid)
# accept the identification and trigger an event # accept the identification and trigger an event
conn.answer(Packets.AcceptIdentification(*args)) conn.answer(Packets.AcceptIdentification(NodeTypes.STORAGE, uuid and
app.uuid, app.pt.getPartitions(), app.pt.getReplicas(), uuid))
handler.connectionCompleted(conn) handler.connectionCompleted(conn)
...@@ -25,10 +25,6 @@ class InitializationHandler(BaseMasterHandler): ...@@ -25,10 +25,6 @@ class InitializationHandler(BaseMasterHandler):
def answerNodeInformation(self, conn): def answerNodeInformation(self, conn):
pass pass
def notifyNodeInformation(self, conn, node_list):
# the whole node list is received here
BaseMasterHandler.notifyNodeInformation(self, conn, node_list)
def answerPartitionTable(self, conn, ptid, row_list): def answerPartitionTable(self, conn, ptid, row_list):
app = self.app app = self.app
pt = app.pt pt = app.pt
...@@ -53,8 +49,9 @@ class InitializationHandler(BaseMasterHandler): ...@@ -53,8 +49,9 @@ class InitializationHandler(BaseMasterHandler):
app.dm.setPartitionTable(ptid, cell_list) app.dm.setPartitionTable(ptid, cell_list)
def answerLastIDs(self, conn, loid, ltid, lptid): def answerLastIDs(self, conn, loid, ltid, lptid, backup_tid):
self.app.dm.setLastOID(loid) self.app.dm.setLastOID(loid)
self.app.dm.setBackupTID(backup_tid)
def notifyPartitionChanges(self, conn, ptid, cell_list): def notifyPartitionChanges(self, conn, ptid, cell_list):
# XXX: This is safe to ignore those notifications because all of the # XXX: This is safe to ignore those notifications because all of the
......
...@@ -24,11 +24,8 @@ from . import BaseMasterHandler ...@@ -24,11 +24,8 @@ from . import BaseMasterHandler
class MasterOperationHandler(BaseMasterHandler): class MasterOperationHandler(BaseMasterHandler):
""" This handler is used for the primary master """ """ This handler is used for the primary master """
def answerUnfinishedTransactions(self, conn, max_tid, ttid_list): def notifyTransactionFinished(self, conn, *args, **kw):
self.app.replicator.setUnfinishedTIDList(max_tid, ttid_list) self.app.replicator.transactionFinished(*args, **kw)
def notifyTransactionFinished(self, conn, ttid, max_tid):
self.app.replicator.transactionFinished(ttid, max_tid)
def notifyPartitionChanges(self, conn, ptid, cell_list): def notifyPartitionChanges(self, conn, ptid, cell_list):
"""This is very similar to Send Partition Table, except that """This is very similar to Send Partition Table, except that
...@@ -44,14 +41,7 @@ class MasterOperationHandler(BaseMasterHandler): ...@@ -44,14 +41,7 @@ class MasterOperationHandler(BaseMasterHandler):
app.dm.changePartitionTable(ptid, cell_list) app.dm.changePartitionTable(ptid, cell_list)
# Check changes for replications # Check changes for replications
if app.replicator is not None: app.replicator.notifyPartitionChanges(cell_list)
for offset, uuid, state in cell_list:
if uuid == app.uuid:
# If this is for myself, this can affect replications.
if state == CellStates.DISCARDED:
app.replicator.removePartition(offset)
elif state == CellStates.OUT_OF_DATE:
app.replicator.addPartition(offset)
def askLockInformation(self, conn, ttid, tid, oid_list): def askLockInformation(self, conn, ttid, tid, oid_list):
if not ttid in self.app.tm: if not ttid in self.app.tm:
...@@ -74,3 +64,11 @@ class MasterOperationHandler(BaseMasterHandler): ...@@ -74,3 +64,11 @@ class MasterOperationHandler(BaseMasterHandler):
if not conn.isClosed(): if not conn.isClosed():
conn.answer(Packets.AnswerPack(True)) conn.answer(Packets.AnswerPack(True))
def replicate(self, conn, tid, upstream_name, source_dict):
self.app.replicator.backup(tid,
dict((p, (a, upstream_name))
for p, a in source_dict.iteritems()))
def askTruncate(self, conn, tid):
self.app.dm.truncate(tid)
conn.answer(Packets.AnswerTruncate())
This diff is collapsed.
...@@ -15,36 +15,101 @@ ...@@ -15,36 +15,101 @@
# along with this program; if not, write to the Free Software # along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from . import BaseClientAndStorageOperationHandler import weakref
from neo.lib.protocol import Packets from functools import wraps
import neo.lib
from neo.lib.connector import ConnectorConnectionClosedException
from neo.lib.handler import EventHandler
from neo.lib.protocol import Errors, NodeStates, Packets, \
ZERO_HASH, ZERO_TID, ZERO_OID
from neo.lib.util import add64, u64
class StorageOperationHandler(BaseClientAndStorageOperationHandler): def checkConnectionIsReplicatorConnection(func):
def decorator(self, conn, *args, **kw):
assert self.app.replicator.getCurrentConnection() is conn
return func(self, conn, *args, **kw)
return wraps(func)(decorator)
def _askObject(self, oid, serial, tid): class StorageOperationHandler(EventHandler):
result = self.app.dm.getObject(oid, serial, tid) """This class handles events for replications."""
if result and result[5]:
return result[:2] + (None, None, None) + result[4:]
return result
def askLastIDs(self, conn): def connectionLost(self, conn, new_state):
app = self.app if self.app.listening_conn and conn.isClient():
oid = app.dm.getLastOID() # XXX: Connection and Node should merged.
tid = app.dm.getLastTID() uuid = conn.getUUID()
conn.answer(Packets.AnswerLastIDs(oid, tid, app.pt.getID())) if uuid:
node = self.app.nm.getByUUID(uuid)
def askTIDsFrom(self, conn, min_tid, max_tid, length, partition_list): else:
assert len(partition_list) == 1, partition_list node = self.app.nm.getByAddress(conn.getAddress())
tid_list = self.app.dm.getReplicationTIDList(min_tid, max_tid, length, node.setState(NodeStates.DOWN)
partition_list[0]) replicator = self.app.replicator
conn.answer(Packets.AnswerTIDsFrom(tid_list)) if replicator.current_node is node:
replicator.abort()
def askObjectHistoryFrom(self, conn, min_oid, min_serial, max_serial,
length, partition): # Client
object_dict = self.app.dm.getObjectHistoryFrom(min_oid, min_serial,
max_serial, length, partition) def connectionFailed(self, conn):
conn.answer(Packets.AnswerObjectHistoryFrom(object_dict)) if self.app.listening_conn:
self.app.replicator.abort()
@checkConnectionIsReplicatorConnection
def acceptIdentification(self, conn, node_type,
uuid, num_partitions, num_replicas, your_uuid):
self.app.replicator.fetchTransactions()
@checkConnectionIsReplicatorConnection
def answerFetchTransactions(self, conn, pack_tid, next_tid, tid_list):
if tid_list:
deleteTransaction = self.app.dm.deleteTransaction
for tid in tid_list:
deleteTransaction(tid)
assert not pack_tid, "TODO"
if next_tid:
self.app.replicator.fetchTransactions(next_tid)
else:
self.app.replicator.fetchObjects()
@checkConnectionIsReplicatorConnection
def addTransaction(self, conn, tid, user, desc, ext, packed, oid_list):
# Directly store the transaction.
self.app.dm.storeTransaction(tid, (),
(oid_list, user, desc, ext, packed), False)
@checkConnectionIsReplicatorConnection
def answerFetchObjects(self, conn, pack_tid, next_tid,
next_oid, object_dict):
if object_dict:
deleteObject = self.app.dm.deleteObject
for serial, oid_list in object_dict.iteritems():
for oid in oid_list:
delObject(oid, serial)
assert not pack_tid, "TODO"
if next_tid:
self.app.replicator.fetchObjects(next_tid, next_oid)
else:
self.app.replicator.finish()
@checkConnectionIsReplicatorConnection
def addObject(self, conn, oid, serial, compression,
checksum, data, data_serial):
dm = self.app.dm
if data or checksum != ZERO_HASH:
data_id = dm.storeData(checksum, data, compression)
else:
data_id = None
# Directly store the transaction.
obj = oid, data_id, data_serial
dm.storeTransaction(serial, (obj,), None, False)
@checkConnectionIsReplicatorConnection
def replicationError(self, conn, message):
self.app.replicator.abort('source message: ' + message)
# Server (all methods must set connection as server so that it isn't closed
# if client tasks are finished)
def askCheckTIDRange(self, conn, min_tid, max_tid, length, partition): def askCheckTIDRange(self, conn, min_tid, max_tid, length, partition):
conn.asServer()
count, tid_checksum, max_tid = self.app.dm.checkTIDRange(min_tid, count, tid_checksum, max_tid = self.app.dm.checkTIDRange(min_tid,
max_tid, length, partition) max_tid, length, partition)
conn.answer(Packets.AnswerCheckTIDRange(min_tid, length, conn.answer(Packets.AnswerCheckTIDRange(min_tid, length,
...@@ -52,9 +117,91 @@ class StorageOperationHandler(BaseClientAndStorageOperationHandler): ...@@ -52,9 +117,91 @@ class StorageOperationHandler(BaseClientAndStorageOperationHandler):
def askCheckSerialRange(self, conn, min_oid, min_serial, max_tid, length, def askCheckSerialRange(self, conn, min_oid, min_serial, max_tid, length,
partition): partition):
conn.asServer()
count, oid_checksum, max_oid, serial_checksum, max_serial = \ count, oid_checksum, max_oid, serial_checksum, max_serial = \
self.app.dm.checkSerialRange(min_oid, min_serial, max_tid, length, self.app.dm.checkSerialRange(min_oid, min_serial, max_tid, length,
partition) partition)
conn.answer(Packets.AnswerCheckSerialRange(min_oid, min_serial, length, conn.answer(Packets.AnswerCheckSerialRange(min_oid, min_serial, length,
count, oid_checksum, max_oid, serial_checksum, max_serial)) count, oid_checksum, max_oid, serial_checksum, max_serial))
def askFetchTransactions(self, conn, partition, length, min_tid, max_tid,
tid_list):
app = self.app
cell = app.pt.getCell(partition, app.uuid)
if cell is None or cell.isOutOfDate():
return conn.answer(Errors.ReplicationError(
"partition %u not readable" % partition))
conn.asServer()
msg_id = conn.getPeerId()
conn = weakref.proxy(conn)
peer_tid_set = set(tid_list)
dm = app.dm
tid_list = dm.getReplicationTIDList(min_tid, max_tid, length + 1,
partition)
next_tid = tid_list.pop() if length < len(tid_list) else None
def push():
try:
pack_tid = None # TODO
for tid in tid_list:
if tid in peer_tid_set:
peer_tid_set.remove(tid)
else:
t = dm.getTransaction(tid)
if t is None:
conn.answer(Errors.ReplicationError(
"partition %u dropped" % partition))
return
oid_list, user, desc, ext, packed = t
conn.notify(Packets.AddTransaction(
tid, user, desc, ext, packed, oid_list))
yield
conn.answer(Packets.AnswerFetchTransactions(
pack_tid, next_tid, peer_tid_set), msg_id)
yield
except (weakref.ReferenceError, ConnectorConnectionClosedException):
pass
app.newTask(push())
def askFetchObjects(self, conn, partition, length, min_tid, max_tid,
min_oid, object_dict):
app = self.app
cell = app.pt.getCell(partition, app.uuid)
if cell is None or cell.isOutOfDate():
return conn.answer(Errors.ReplicationError(
"partition %u not readable" % partition))
conn.asServer()
msg_id = conn.getPeerId()
conn = weakref.proxy(conn)
dm = app.dm
object_list = dm.getReplicationObjectList(min_tid, max_tid, length,
partition, min_oid)
if length < len(object_list):
next_tid, next_oid = object_list.pop()
else:
next_tid = next_oid = None
def push():
try:
pack_tid = None # TODO
for serial, oid in object_list:
oid_set = object_dict.get(serial)
if oid_set:
if type(oid_set) is list:
object_dict[serial] = oid_set = set(oid_set)
if oid in oid_set:
oid_set.remove(oid)
if not oid_set:
del object_dict[serial]
continue
object = dm.getObject(oid, serial)
if object is None:
conn.answer(Errors.ReplicationError(
"partition %u dropped" % partition))
return
conn.notify(Packets.AddObject(oid, serial, *object[2:]))
yield
conn.answer(Packets.AnswerFetchObjects(
pack_tid, next_tid, next_oid, object_dict), msg_id)
yield
except (weakref.ReferenceError, ConnectorConnectionClosedException):
pass
app.newTask(push())
...@@ -27,15 +27,11 @@ class VerificationHandler(BaseMasterHandler): ...@@ -27,15 +27,11 @@ class VerificationHandler(BaseMasterHandler):
def askLastIDs(self, conn): def askLastIDs(self, conn):
app = self.app app = self.app
try: conn.answer(Packets.AnswerLastIDs(
oid = app.dm.getLastOID() app.dm.getLastOID(),
except KeyError: app.dm.getLastTIDs()[0],
oid = None app.pt.getID(),
try: app.dm.getBackupTID()))
tid = app.dm.getLastTID()
except KeyError:
tid = None
conn.answer(Packets.AnswerLastIDs(oid, tid, app.pt.getID()))
def askPartitionTable(self, conn): def askPartitionTable(self, conn):
pt = self.app.pt pt = self.app.pt
......
This diff is collapsed.
...@@ -131,6 +131,11 @@ class NeoTestBase(unittest.TestCase): ...@@ -131,6 +131,11 @@ class NeoTestBase(unittest.TestCase):
sys.stdout.write('\n') sys.stdout.write('\n')
sys.stdout.flush() sys.stdout.flush()
class failureException(AssertionError):
def __init__(self, msg=None):
neo.lib.logging.error(msg)
AssertionError.__init__(self, msg)
failIfEqual = failUnlessEqual = assertEquals = assertNotEquals = None failIfEqual = failUnlessEqual = assertEquals = assertNotEquals = None
def assertNotEqual(self, first, second, msg=None): def assertNotEqual(self, first, second, msg=None):
......
This diff is collapsed.
...@@ -234,6 +234,9 @@ class ClientTests(NEOFunctionalTest): ...@@ -234,6 +234,9 @@ class ClientTests(NEOFunctionalTest):
temp_dir=self.getTempDirectory()) temp_dir=self.getTempDirectory())
neoctl = self.neo.getNEOCTL() neoctl = self.neo.getNEOCTL()
self.neo.start() self.neo.start()
# BUG: The following 2 lines creates 2 app, i.e. 2 TCP connections
# to the storage, so there may be a race condition at network
# level and 'st2.store' may be effective before 'st1.store'.
db1, conn1 = self.neo.getZODBConnection() db1, conn1 = self.neo.getZODBConnection()
db2, conn2 = self.neo.getZODBConnection() db2, conn2 = self.neo.getZODBConnection()
st1, st2 = conn1._storage, conn2._storage st1, st2 = conn1._storage, conn2._storage
......
...@@ -35,7 +35,7 @@ class ClusterTests(NEOFunctionalTest): ...@@ -35,7 +35,7 @@ class ClusterTests(NEOFunctionalTest):
def testClusterStartup(self): def testClusterStartup(self):
neo = NEOCluster(['test_neo1', 'test_neo2'], replicas=1, neo = NEOCluster(['test_neo1', 'test_neo2'], replicas=1,
adapter='MySQL', temp_dir=self.getTempDirectory()) temp_dir=self.getTempDirectory())
neoctl = neo.getNEOCTL() neoctl = neo.getNEOCTL()
neo.run() neo.run()
# Runing a new cluster doesn't exit Recovery state. # Runing a new cluster doesn't exit Recovery state.
......
This diff is collapsed.
...@@ -85,7 +85,7 @@ class MasterRecoveryTests(NeoUnitTestBase): ...@@ -85,7 +85,7 @@ class MasterRecoveryTests(NeoUnitTestBase):
self.assertTrue(ptid2 > self.app.pt.getID()) self.assertTrue(ptid2 > self.app.pt.getID())
self.assertTrue(oid2 > self.app.tm.getLastOID()) self.assertTrue(oid2 > self.app.tm.getLastOID())
self.assertTrue(tid2 > self.app.tm.getLastTID()) self.assertTrue(tid2 > self.app.tm.getLastTID())
recovery.answerLastIDs(conn, oid2, tid2, ptid2) recovery.answerLastIDs(conn, oid2, tid2, ptid2, None)
self.assertEqual(oid2, self.app.tm.getLastOID()) self.assertEqual(oid2, self.app.tm.getLastOID())
self.assertEqual(tid2, self.app.tm.getLastTID()) self.assertEqual(tid2, self.app.tm.getLastTID())
self.assertEqual(ptid2, recovery.target_ptid) self.assertEqual(ptid2, recovery.target_ptid)
......
...@@ -130,10 +130,11 @@ class MasterStorageHandlerTests(NeoUnitTestBase): ...@@ -130,10 +130,11 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
self.app.tm.setLastTID(tid) self.app.tm.setLastTID(tid)
service.askLastIDs(conn) service.askLastIDs(conn)
packet = self.checkAnswerLastIDs(conn) packet = self.checkAnswerLastIDs(conn)
loid, ltid, lptid = packet.decode() loid, ltid, lptid, backup_tid = packet.decode()
self.assertEqual(loid, oid) self.assertEqual(loid, oid)
self.assertEqual(ltid, tid) self.assertEqual(ltid, tid)
self.assertEqual(lptid, ptid) self.assertEqual(lptid, ptid)
self.assertEqual(backup_tid, None)
def test_13_askUnfinishedTransactions(self): def test_13_askUnfinishedTransactions(self):
service = self.service service = self.service
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
...@@ -26,8 +26,7 @@ from neo.storage.transactions import TransactionManager, \ ...@@ -26,8 +26,7 @@ from neo.storage.transactions import TransactionManager, \
DelayedError, ConflictError DelayedError, ConflictError
from neo.lib.connection import MTClientConnection from neo.lib.connection import MTClientConnection
from neo.lib.protocol import NodeStates, Packets, ZERO_TID from neo.lib.protocol import NodeStates, Packets, ZERO_TID
from . import NEOCluster, NEOThreadedTest, \ from . import NEOCluster, NEOThreadedTest, Patch
Patch, ConnectionFilter
from neo.lib.util import makeChecksum from neo.lib.util import makeChecksum
from neo.client.pool import CELL_CONNECTED, CELL_GOOD from neo.client.pool import CELL_CONNECTED, CELL_GOOD
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment