Commit 23fad3af by Julien Muchembled

Reduce size of UUIDs to 4-bytes

1 parent 6bc3318d
Showing 56 changed files with 383 additions and 419 deletions
......@@ -31,6 +31,8 @@ Other changes are:
- Adding and removing master nodes is now easier: unknown incoming master nodes
are now accepted instead of rejected, and nodes can be given a path to a file
that maintains a list of known master nodes.
- Node UUIDs have been shortened from 16 to 4 bytes, for better performance and
easier debugging.
Also contains code clean-ups and bugfixes.
......
......@@ -30,11 +30,6 @@ RC - Review output of pylint (CODE)
Consider the need to implement a keep-alive system (packets sent
automatically when there is no activity on the connection for a period
of time).
- Factorise packet data when sending partition table cells (BANDWITH)
Currently, each cell in a partition table update contains UUIDs of all
involved nodes.
It must be changed to a correspondance table using shorter keys (sent
in the packet) to avoid repeating the same UUIDs many times.
- Consider using multicast for cluster-wide notifications. (BANDWITH)
Currently, multi-receivers notifications are sent in unicast to each
receiver. Multicast should be used.
......
......@@ -6,6 +6,7 @@ compatibility or transparent migration, so you will have to use the following
SQL commands to migrate each storage from NEO 0.10.x::
-- make sure 'tobj' & 'ttrans' are empty first
- and all storages have up-to-date partition tables
CREATE TABLE new_data (id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, hash BINARY(20) NOT NULL UNIQUE, compression TINYINT UNSIGNED NULL, value LONGBLOB NULL) ENGINE = InnoDB SELECT DISTINCT obj.hash as hash, compression, value FROM obj, data WHERE obj.hash=data.hash ORDER BY serial;
DROP TABLE data;
RENAME TABLE new_data TO data;
......@@ -17,6 +18,12 @@ SQL commands to migrate each storage from NEO 0.10.x::
UPDATE trans SET ttid=tid;
ALTER TABLE ttrans ADD COLUMN ttid BIGINT UNSIGNED NOT NULL;
CREATE TEMPORARY TABLE uuid (new INT NOT NULL AUTO_INCREMENT PRIMARY KEY, old CHAR(32) NOT NULL, KEY (old)) ENGINE = InnoDB SELECT DISTINCT uuid as old FROM pt ORDER BY uuid;
ALTER TABLE pt DROP PRIMARY KEY, CHANGE uuid old CHAR(32) NOT NULL, ADD uuid INT NOT NULL after rid;
UPDATE pt, uuid SET pt.uuid=uuid.new WHERE pt.old=uuid.old;
ALTER TABLE pt DROP old, ADD PRIMARY KEY (rid, uuid);
UPDATE config, uuid SET config.value=uuid.new WHERE config.name='uuid' AND uuid.old=config.value;
NEO 0.10
========
......
......@@ -16,9 +16,8 @@
from neo.lib import logging, protocol
from neo.lib.handler import EventHandler
from neo.lib.protocol import Packets, Errors
from neo.lib.protocol import uuid_str, Packets, Errors
from neo.lib.exception import PrimaryFailure
from neo.lib.util import dump
def check_primary_master(func):
def wrapper(self, *args, **kw):
......@@ -38,7 +37,7 @@ class AdminEventHandler(EventHandler):
@check_primary_master
def askPartitionList(self, conn, min_offset, max_offset, uuid):
logging.info("ask partition list from %s to %s for %s",
min_offset, max_offset, dump(uuid))
min_offset, max_offset, uuid_str(uuid))
self.app.sendPartitionTable(conn, min_offset, max_offset, uuid)
@check_primary_master
......@@ -56,7 +55,7 @@ class AdminEventHandler(EventHandler):
@check_primary_master
def setNodeState(self, conn, uuid, state, modify_partition_table):
logging.info("set node state for %s-%s", dump(uuid), state)
logging.info("set node state for %s-%s", uuid_str(uuid), state)
node = self.app.nm.getByUUID(uuid)
if node is None:
raise protocol.ProtocolError('invalid uuid')
......
......@@ -18,8 +18,7 @@ from time import sleep
from . import logging
from .handler import EventHandler
from .protocol import Packets
from .util import dump
from .protocol import uuid_str, Packets
from .connection import ClientConnection
NO_SERVER = ('0.0.0.0', 0)
......@@ -115,7 +114,7 @@ class BootstrapManager(EventHandler):
if self.uuid != your_uuid:
# got an uuid from the primary master
self.uuid = your_uuid
logging.info('Got a new UUID: %s', dump(self.uuid))
logging.info('Got a new UUID: %s', uuid_str(self.uuid))
self.accepted = True
def getPrimaryConnection(self, connector_handler):
......
......@@ -84,7 +84,9 @@ class ConfigurationManager(object):
def getUUID(self):
# only from command line
return util.bin(self.argument_list.get('uuid', None))
uuid = self.argument_list.get('uuid', None)
if uuid:
return int(uuid)
def getUpstreamCluster(self):
return self.__get('upstream_cluster', True)
......
......@@ -23,8 +23,9 @@ from .connector import ConnectorException, ConnectorTryAgainException, \
ConnectorConnectionClosedException
from .locking import RLock
from .profiling import profiler_decorator
from .protocol import Errors, PacketMalformedError, Packets, ParserState
from .util import dump, ReadBuffer
from .protocol import uuid_str, Errors, \
PacketMalformedError, Packets, ParserState
from .util import ReadBuffer
CRITICAL_TIMEOUT = 30
......@@ -297,7 +298,7 @@ class BaseConnection(object):
address = self.addr and '%s:%d' % self.addr or '?'
return '<%s(uuid=%s, address=%s, closed=%s, handler=%s) at %x>' % (
self.__class__.__name__,
dump(self.getUUID()),
uuid_str(self.getUUID()),
address,
int(self.isClosed()),
self.getHandler(),
......
......@@ -21,7 +21,6 @@
# Fortunately, SQLite allow multiple process to access the same DB,
# so an external tool should be able to dump and empty tables.
from binascii import b2a_hex
from collections import deque
from functools import wraps
from logging import getLogger, Formatter, Logger, LogRecord, StreamHandler, \
......@@ -117,6 +116,9 @@ class NEOLogger(Logger):
def setup(self, filename=None, reset=False):
self._acquire()
try:
from . import protocol as p
global uuid_str
uuid_str = p.uuid_str
if self.db is not None:
self.db.close()
if not filename:
......@@ -153,7 +155,6 @@ class NEOLogger(Logger):
date REAL PRIMARY KEY NOT NULL,
text BLOB NOT NULL)
""")
from . import protocol as p
with open(inspect.getsourcefile(p)) as p:
p = buffer(bz2.compress(p.read()))
for t, in q("SELECT text FROM protocol ORDER BY date DESC"):
......@@ -172,7 +173,7 @@ class NEOLogger(Logger):
if type(r) is PacketRecord:
ip, port = r.addr
peer = '%s %s (%s:%u)' % ('>' if r.outgoing else '<',
r.uuid and b2a_hex(r.uuid), ip, port)
uuid_str(r.uuid), ip, port)
self.db.execute("INSERT INTO packet VALUES (?,?,?,?,?,?)",
(r.created, r._name, r.msg_id, r.code, peer, buffer(r.msg)))
else:
......
......@@ -19,8 +19,7 @@ from os.path import exists, getsize
import json
from . import attributeTracker, logging
from .util import dump
from .protocol import NodeTypes, NodeStates, ProtocolError
from .protocol import uuid_str, NodeTypes, NodeStates, ProtocolError
class Node(object):
......@@ -166,7 +165,7 @@ class Node(object):
def __repr__(self):
return '<%s(uuid=%s, address=%s, state=%s, connection=%r) at %x>' % (
self.__class__.__name__,
dump(self._uuid),
uuid_str(self._uuid),
self._address,
self._state,
self._connection,
......@@ -511,16 +510,16 @@ class NodeManager(object):
elif by_address is None:
node = by_uuid
else:
raise ValueError('Got different nodes for uuid %r: %r and '
'address %r: %r.' % (dump(uuid), by_uuid, address,
raise ValueError('Got different nodes for uuid %s: %r and '
'address %r: %r.' % (uuid_str(uuid), by_uuid, address,
by_address))
if uuid is not None:
node_uuid = node.getUUID()
if node_uuid is None:
node.setUUID(uuid)
elif node_uuid != uuid:
raise ValueError('Expected uuid %r on node %r' % (
dump(uuid), node))
raise ValueError('Expected uuid %s on node %r' % (
uuid_str(uuid), node))
if address is not None:
node_address = node.getAddress()
if node_address is None:
......@@ -574,7 +573,7 @@ class NodeManager(object):
node_by_addr = self.getByAddress(addr)
node = node_by_uuid or node_by_addr
log_args = (node_type, dump(uuid), addr, state)
log_args = node_type, uuid_str(uuid), addr, state
if node is None:
if state == NodeStates.DOWN:
logging.debug('NOT creating node %s %s %s %s', *log_args)
......@@ -606,10 +605,12 @@ class NodeManager(object):
def log(self):
logging.info('Node manager : %u nodes', len(self._node_set))
for node in sorted(list(self._node_set)):
uuid = dump(node.getUUID()) or '-' * 32
node_list = [(node, uuid_str(node.getUUID()))
for node in sorted(self._node_set)]
max_len = max(len(x[1]) for x in node_list)
for node, uuid in node_list:
address = node.getAddress() or ''
if address:
address = '%s:%d' % address
logging.info(' * %32s | %8s | %22s | %s',
uuid, node.getType(), address, node.getState())
logging.info(' * %*s | %8s | %22s | %s',
max_len, uuid, node.getType(), address, node.getState())
......@@ -26,7 +26,7 @@ except ImportError:
pass
# The protocol version (major, minor).
PROTOCOL_VERSION = (9, 1)
PROTOCOL_VERSION = (10, 1)
# Size restrictions.
MIN_PACKET_SIZE = 10
......@@ -154,7 +154,7 @@ cell_state_prefix_dict = {
}
# Other constants.
INVALID_UUID = '\0' * 16
INVALID_UUID = 0
INVALID_TID = '\xff' * 8
INVALID_OID = '\xff' * 8
INVALID_PARTITION = 0xffffffff
......@@ -166,12 +166,24 @@ OID_LEN = len(INVALID_OID)
TID_LEN = len(INVALID_TID)
MAX_TID = '\x7f' + '\xff' * 7 # SQLite does not accept numbers above 2^63-1
# High-order byte:
# 7 6 5 4 3 2 1 0
# | | | | +-+-+-+-- reserved (0)
# | +-+-+---------- node type
# +---------------- temporary if negative
# UUID namespaces are required to prevent conflicts when the master generate
# new uuid before it knows uuid of existing storage nodes. So only the high
# order bit is really important and the 31 other bits could be random.
# Extra namespace information and non-randomness of 3 LOB help to read logs.
UUID_NAMESPACES = {
NodeTypes.STORAGE: 'S',
NodeTypes.MASTER: 'M',
NodeTypes.CLIENT: 'C',
NodeTypes.ADMIN: 'A',
NodeTypes.STORAGE: 0x00,
NodeTypes.MASTER: -0x10,
NodeTypes.CLIENT: -0x20,
NodeTypes.ADMIN: -0x30,
}
uuid_str = (lambda ns: lambda uuid:
ns[uuid >> 24] + str(uuid & 0xffffff) if uuid else str(uuid)
)(dict((v, str(k)[0]) for k, v in UUID_NAMESPACES.iteritems()))
class ProtocolError(Exception):
""" Base class for protocol errors, close the connection """
......@@ -586,21 +598,18 @@ class PChecksum(PItem):
def _decode(self, reader):
return reader(20)
class PUUID(PItem):
class PUUID(PStructItem):
"""
An UUID (node identifier)
An UUID (node identifier, 4-bytes signed integer)
"""
def __init__(self, name):
PStructItem.__init__(self, name, '!l')
def _encode(self, writer, uuid):
if uuid is None:
uuid = INVALID_UUID
assert len(uuid) == 16, (len(uuid), uuid)
writer(uuid)
writer(self.pack(uuid or 0))
def _decode(self, reader):
uuid = reader(16)
if uuid == INVALID_UUID:
uuid = None
return uuid
return self.unpack(reader(self.size))[0] or None
class PTID(PItem):
"""
......
......@@ -18,8 +18,8 @@ import math
from functools import wraps
from . import logging, protocol
from .protocol import CellStates
from .util import dump, u64
from .protocol import uuid_str, CellStates
from .util import u64
from .locking import RLock
class PartitionTableException(Exception):
......@@ -36,7 +36,7 @@ class Cell(object):
def __repr__(self):
return "<Cell(uuid=%s, address=%s, state=%s)>" % (
dump(self.getUUID()),
uuid_str(self.getUUID()),
self.getAddress(),
self.getState(),
)
......@@ -226,7 +226,7 @@ class PartitionTable(object):
self._id = ptid
for offset, uuid, state in cell_list:
node = nm.getByUUID(uuid)
assert node is not None, 'No node found for uuid %r' % (dump(uuid), )
assert node is not None, 'No node found for uuid ' + uuid_str(uuid)
self.setCell(offset, node, state)
logging.debug('partition table updated (ptid=%s)', ptid)
self.log()
......@@ -260,7 +260,7 @@ class PartitionTable(object):
width under 80 column).
"""
node_list = sorted(self.count_dict)
result = ['pt: node %u: %s, %s' % (i, dump(node.getUUID()),
result = ['pt: node %u: %s, %s' % (i, uuid_str(node.getUUID()),
protocol.node_state_prefix_dict[node.getState()])
for i, node in enumerate(node_list)]
append = result.append
......
......@@ -20,7 +20,7 @@ from time import time
from neo.lib import logging
from neo.lib.connector import getConnectorHandler
from neo.lib.debug import register as registerLiveDebugger
from neo.lib.protocol import UUID_NAMESPACES, ZERO_TID, NotReadyError
from neo.lib.protocol import uuid_str, UUID_NAMESPACES, ZERO_TID, NotReadyError
from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets
from neo.lib.node import NodeManager
from neo.lib.event import EventManager
......@@ -335,7 +335,7 @@ class Application(object):
if self.uuid is None:
self.uuid = self.getNewUUID(None, self.server, NodeTypes.MASTER)
logging.info('My UUID: ' + dump(self.uuid))
logging.info('My UUID: ' + uuid_str(self.uuid))
else:
in_conflict = self.nm.getByUUID(self.uuid)
if in_conflict is not None:
......@@ -443,14 +443,16 @@ class Application(object):
self.cluster_state = state
def getNewUUID(self, uuid, address, node_type):
getByUUID = self.nm.getByUUID
if None != uuid != self.uuid:
node = self.nm.getByUUID(uuid)
node = getByUUID(uuid)
if node is None or node.getAddress() == address:
return uuid
while True:
uuid = UUID_NAMESPACES[node_type] + os.urandom(15)
if uuid != self.uuid and self.nm.getByUUID(uuid) is None:
hob = UUID_NAMESPACES[node_type]
for uuid in xrange((hob << 24) + 1, hob + 0x10 << 24):
if uuid != self.uuid and getByUUID(uuid) is None:
return uuid
raise RuntimeError
def getClusterState(self):
return self.cluster_state
......
......@@ -22,7 +22,7 @@ from neo.lib.connector import getConnectorHandler
from neo.lib.exception import PrimaryFailure
from neo.lib.node import NodeManager
from neo.lib.protocol import CellStates, ClusterStates, NodeTypes, Packets
from neo.lib.protocol import INVALID_TID, ZERO_TID
from neo.lib.protocol import uuid_str, INVALID_TID, ZERO_TID
from neo.lib.util import add64, dump
from .app import StateChangedException
from .pt import PartitionTable
......@@ -154,9 +154,9 @@ class BackupApplication(object):
cell.replicating = tid
if cell.backup_tid < tid:
logging.debug(
"ask %s to replicate partition %u up to %s from %r",
dump(cell.getUUID()), offset, dump(tid),
dump(primary_node.getUUID()))
"ask %s to replicate partition %u up to %s from %s",
uuid_str(cell.getUUID()), offset, dump(tid),
uuid_str(primary_node.getUUID()))
cell.getNode().getConnection().notify(p)
trigger_set.add(primary_node)
for node in trigger_set:
......@@ -238,7 +238,7 @@ class BackupApplication(object):
address_set.add(addr)
source_dict[offset] = addr
logging.debug("ask %s to replicate partition %u up to %s from %r",
dump(node.getUUID()), offset, dump(tid), addr)
uuid_str(node.getUUID()), offset, dump(tid), addr)
node.getConnection().notify(Packets.Replicate(
tid, self.name, source_dict))
......@@ -272,9 +272,9 @@ class BackupApplication(object):
if tid < max_tid:
cell.replicating = max_tid
logging.debug(
"ask %s to replicate partition %u up to %s from %r",
dump(node.getUUID()), offset, dump(max_tid),
dump(primary_node.getUUID()))
"ask %s to replicate partition %u up to %s from %s",
uuid_str(node.getUUID()), offset, dump(max_tid),
uuid_str(primary_node.getUUID()))
node.getConnection().notify(Packets.Replicate(max_tid,
'', {offset: primary_node.getAddress()}))
else:
......@@ -288,7 +288,7 @@ class BackupApplication(object):
cell.replicating = tid
logging.debug(
"ask %s to replicate partition %u up to %s from"
" %r", dump(cell.getUUID()), offset, dump(tid),
dump(node.getUUID()))
" %s", uuid_str(cell.getUUID()), offset,
dump(tid), uuid_str(node.getUUID()))
cell.getNode().getConnection().notify(p)
return result
......@@ -16,10 +16,9 @@
from neo.lib import logging
from neo.lib.handler import EventHandler
from neo.lib.protocol import (NodeTypes, NodeStates, Packets,
from neo.lib.protocol import (uuid_str, NodeTypes, NodeStates, Packets,
BrokenNodeDisallowedError,
)
from neo.lib.util import dump
class MasterHandler(EventHandler):
"""This class implements a generic part of the event handlers."""
......@@ -100,7 +99,7 @@ class BaseServiceHandler(MasterHandler):
assert new_state in (NodeStates.TEMPORARILY_DOWN, NodeStates.DOWN,
NodeStates.BROKEN), new_state
assert node.getState() not in (NodeStates.TEMPORARILY_DOWN,
NodeStates.DOWN, NodeStates.BROKEN), (dump(self.app.uuid),
NodeStates.DOWN, NodeStates.BROKEN), (uuid_str(self.app.uuid),
node.whoSetState(), new_state)
was_pending = node.isPending()
node.setState(new_state)
......
......@@ -20,7 +20,7 @@ from . import MasterHandler
from ..app import StateChangedException
from neo.lib import logging
from neo.lib.protocol import ClusterStates, NodeStates, Packets, ProtocolError
from neo.lib.protocol import Errors
from neo.lib.protocol import Errors, uuid_str
from neo.lib.util import dump
CLUSTER_STATE_WORKFLOW = {
......@@ -73,7 +73,7 @@ class AdministrationHandler(MasterHandler):
def setNodeState(self, conn, uuid, state, modify_partition_table):
logging.info("set node state for %s-%s : %s",
dump(uuid), state, modify_partition_table)
uuid_str(uuid), state, modify_partition_table)
app = self.app
node = app.nm.getByUUID(uuid)
if node is None:
......@@ -127,7 +127,7 @@ class AdministrationHandler(MasterHandler):
app.broadcastNodesInformation([node])
def addPendingNodes(self, conn, uuid_list):
uuids = ', '.join(map(dump, uuid_list))
uuids = ', '.join(map(uuid_str, uuid_list))
logging.debug('Add nodes %s', uuids)
app = self.app
nm = app.nm
......@@ -148,7 +148,7 @@ class AdministrationHandler(MasterHandler):
logging.warning('No nodes added')
conn.answer(Errors.Ack('No nodes added'))
return
uuids = ', '.join(map(dump, uuid_set))
uuids = ', '.join(map(uuid_str, uuid_set))
logging.info('Adding nodes %s', uuids)
# switch nodes to running state
node_list = map(nm.getByUUID, uuid_set)
......
......@@ -15,12 +15,11 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging
from neo.lib.protocol import NodeTypes, NodeStates, Packets
from neo.lib.protocol import uuid_str, NodeTypes, NodeStates, Packets
from neo.lib.protocol import NotReadyError, ProtocolError, \
UnexpectedPacketError
from neo.lib.exception import ElectionFailure
from neo.lib.handler import EventHandler
from neo.lib.util import dump
from . import MasterHandler
class BaseElectionHandler(EventHandler):
......@@ -58,8 +57,8 @@ class ClientElectionHandler(BaseElectionHandler):
def connectionFailed(self, conn):
addr = conn.getAddress()
node = self.app.nm.getByAddress(addr)
assert node is not None, (dump(self.app.uuid), addr)
assert node.isUnknown(), (dump(self.app.uuid), node.whoSetState(),
assert node is not None, (uuid_str(self.app.uuid), addr)
assert node.isUnknown(), (uuid_str(self.app.uuid), node.whoSetState(),
node)
# connection never success, node is still in unknown state
self.app.negotiating_master_node_set.discard(addr)
......@@ -92,7 +91,7 @@ class ClientElectionHandler(BaseElectionHandler):
if app.server == address:
# This is self.
assert node.getAddress() != primary or uuid == your_uuid, (
dump(uuid), dump(your_uuid))
uuid_str(uuid), uuid_str(your_uuid))
continue
n = app.nm.getByAddress(address)
if n is None:
......
......@@ -16,9 +16,8 @@
import neo
from neo.lib import logging
from neo.lib.util import dump
from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets, \
NotReadyError, ProtocolError
NotReadyError, ProtocolError, uuid_str
from . import MasterHandler
class IdentificationHandler(MasterHandler):
......@@ -68,7 +67,7 @@ class IdentificationHandler(MasterHandler):
raise NotImplementedError(node_type)
uuid = app.getNewUUID(uuid, address, node_type)
logging.info('Accept a' + human_readable_node_type + dump(uuid))
logging.info('Accept a' + human_readable_node_type + uuid_str(uuid))
if node is None:
node = node_ctor(uuid=uuid, address=address)
node.setUUID(uuid)
......
......@@ -17,9 +17,8 @@
from . import MasterHandler
from neo.lib.handler import EventHandler
from neo.lib.exception import ElectionFailure, PrimaryFailure
from neo.lib.protocol import NodeTypes, Packets
from neo.lib.protocol import NodeTypes, Packets, uuid_str
from neo.lib import logging
from neo.lib.util import dump
class SecondaryMasterHandler(MasterHandler):
""" Handler used by primary to handle secondary masters"""
......@@ -97,7 +96,7 @@ class PrimaryHandler(EventHandler):
if your_uuid != app.uuid:
app.uuid = your_uuid
logging.info('My UUID: ' + dump(your_uuid))
logging.info('My UUID: ' + uuid_str(your_uuid))
node.setUUID(uuid)
......@@ -19,6 +19,7 @@ from struct import pack, unpack
from neo.lib.protocol import ZERO_TID
from datetime import timedelta, datetime
from neo.lib import logging
from neo.lib.protocol import uuid_str
from neo.lib.util import dump, u64, p64
TID_LOW_OVERFLOW = 2**32
......@@ -118,7 +119,7 @@ class Transaction(object):
self._node,
dump(self._tid),
map(dump, self._oid_list or ()),
map(dump, self._uuid_set or ()),
map(uuid_str, self._uuid_set or ()),
time() - self._birth,
id(self),
)
......@@ -417,7 +418,7 @@ class TransactionManager(object):
If transaction is completely locked, calls function given at
instanciation time.
"""
logging.debug('Lock TXN %s for %s', dump(ttid), dump(uuid))
logging.debug('Lock TXN %s for %s', dump(ttid), uuid_str(uuid))
assert ttid in self._ttid_dict, "Transaction not started"
txn = self._ttid_dict[ttid]
if txn.lock(uuid) and self._queue[0][1] == ttid:
......
......@@ -15,8 +15,9 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from .neoctl import NeoCTL, NotReadyException
from neo.lib.util import bin, dump, p64
from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, ZERO_TID
from neo.lib.util import bin, p64
from neo.lib.protocol import uuid_str, ClusterStates, NodeStates, NodeTypes, \
ZERO_TID
action_dict = {
'print': {
......@@ -57,7 +58,7 @@ class TerminalNeoCTL(object):
def formatRowList(self, row_list):
return '\n'.join('%03d | %s' % (offset,
''.join('%s - %s |' % (dump(uuid), state)
''.join('%s - %s |' % (uuid_str(uuid), state)
for (uuid, state) in cell_list))
for (offset, cell_list) in row_list)
......@@ -69,13 +70,10 @@ class TerminalNeoCTL(object):
if address is None:
address = (None, None)
ip, port = address
result.append('%s - %s - %s:%s - %s' % (node_type, dump(uuid), ip,
port, state))
result.append('%s - %s - %s:%s - %s' % (node_type, uuid_str(uuid),
ip, port, state))
return '\n'.join(result)
def formatUUID(self, uuid):
return dump(uuid)
# Actual actions
def getPartitionRowList(self, params):
"""
......@@ -185,7 +183,7 @@ class TerminalNeoCTL(object):
"""
Get primary master node.
"""
return self.formatUUID(self.neoctl.getPrimary())
return uuid_str(self.neoctl.getPrimary())
def checkReplicas(self, params):
"""
......
......@@ -50,8 +50,8 @@ class main(object):
g = {}
exec bz2.decompress(*q("SELECT text FROM protocol WHERE date<?"
" ORDER BY date DESC", (date,)).next()) in g
self.Packets = g['Packets']
self.PacketMalformedError = g['PacketMalformedError']
for x in 'uuid_str', 'Packets', 'PacketMalformedError':
setattr(self, x, g[x])
try:
self._next_protocol, = q("SELECT date FROM protocol WHERE date>=?",
(date,)).next()
......@@ -96,11 +96,17 @@ class main(object):
return "%s (%s)" % (code, message),
def notifyNodeInformation(self, node_list):
for node_type, address, uuid, state in node_list:
address = '%s:%u' % address if address else '?'
if uuid is not None:
uuid = b2a_hex(uuid)
yield ' ! %s | %8s | %22s | %s' % (uuid, node_type, address, state)
node_list.sort(key=lambda x: x[2])
node_list = [(self.uuid_str(uuid), str(node_type),
'%s:%u' % address if address else '?', state)
for node_type, address, uuid, state in node_list]
if node_list:
t = ' ! %%%us | %%%us | %%%us | %%s' % (
max(len(x[0]) for x in node_list),
max(len(x[1]) for x in node_list),
max(len(x[2]) for x in node_list))
return map(t.__mod__, node_list)
return ()
if __name__ == "__main__":
......
......@@ -18,7 +18,7 @@ import sys
from collections import deque
from neo.lib import logging
from neo.lib.protocol import NodeTypes, CellStates, Packets
from neo.lib.protocol import uuid_str, CellStates, NodeTypes, Packets
from neo.lib.node import NodeManager
from neo.lib.event import EventManager
from neo.lib.connection import ListeningConnection
......@@ -138,7 +138,7 @@ class Application(object):
self.pt = PartitionTable(num_partitions, num_replicas)
logging.info('Configuration loaded:')
logging.info('UUID : %s', dump(self.uuid))
logging.info('UUID : %s', uuid_str(self.uuid))
logging.info('PTID : %s', dump(ptid))
logging.info('Name : %s', self.name)
logging.info('Partitions: %s', num_partitions)
......@@ -233,7 +233,7 @@ class Application(object):
(node, conn, uuid, num_partitions, num_replicas) = data
self.master_node = node
self.master_conn = conn
logging.info('I am %s', dump(uuid))
logging.info('I am %s', uuid_str(uuid))
self.uuid = uuid
self.dm.setUUID(uuid)
......
......@@ -104,13 +104,15 @@ class DatabaseManager(object):
"""
Load an UUID from a database.
"""
return util.bin(self.getConfiguration('uuid'))
uuid = self.getConfiguration('uuid')
if uuid is not None:
return int(uuid)
def setUUID(self, uuid):
"""
Store an UUID into a database.
"""
self.setConfiguration('uuid', util.dump(uuid))
self.setConfiguration('uuid', str(uuid))
def getNumPartitions(self):
"""
......@@ -188,7 +190,7 @@ class DatabaseManager(object):
return self.setConfiguration('backup_tid', util.dump(backup_tid))
def getPartitionTable(self):
"""Return a whole partition table as a tuple of rows. Each row
"""Return a whole partition table as a sequence of rows. Each row
is again a tuple of an offset (row ID), an UUID of a storage
node, and a cell state."""
raise NotImplementedError
......
......@@ -165,7 +165,7 @@ class MySQLDatabaseManager(DatabaseManager):
# The table "pt" stores a partition table.
q("""CREATE TABLE IF NOT EXISTS pt (
rid INT UNSIGNED NOT NULL,
uuid CHAR(32) NOT NULL,
uuid INT NOT NULL,
state TINYINT UNSIGNED NOT NULL,
PRIMARY KEY (rid, uuid)
) ENGINE = InnoDB""")
......@@ -267,13 +267,7 @@ class MySQLDatabaseManager(DatabaseManager):
return -1
def getPartitionTable(self):
q = self.query
cell_list = q("""SELECT rid, uuid, state FROM pt""")
pt = []
for offset, uuid, state in cell_list:
uuid = util.bin(uuid)
pt.append((offset, uuid, state))
return pt
return self.query("SELECT * FROM pt")
def _getLastTIDs(self, all=True):
p64 = util.p64
......@@ -340,21 +334,19 @@ class MySQLDatabaseManager(DatabaseManager):
return serial, next_serial, compression, checksum, data, value_serial
def doSetPartitionTable(self, ptid, cell_list, reset):
e = self.escape
offset_list = []
with self as q:
if reset:
q("""TRUNCATE pt""")
for offset, uuid, state in cell_list:
uuid = e(util.dump(uuid))
# TODO: this logic should move out of database manager
# add 'dropCells(cell_list)' to API and use one query
if state == CellStates.DISCARDED:
q("""DELETE FROM pt WHERE rid = %d AND uuid = '%s'""" \
q("""DELETE FROM pt WHERE rid = %d AND uuid = %d"""
% (offset, uuid))
else:
offset_list.append(offset)
q("""INSERT INTO pt VALUES (%d, '%s', %d)
q("""INSERT INTO pt VALUES (%d, %d, %d)
ON DUPLICATE KEY UPDATE state = %d""" \
% (offset, uuid, state, state))
self.setPTID(ptid)
......
......@@ -106,7 +106,7 @@ class SQLiteDatabaseManager(DatabaseManager):
# The table "pt" stores a partition table.
q("""CREATE TABLE IF NOT EXISTS pt (
rid INTEGER NOT NULL,
uuid BLOB NOT NULL,
uuid INTEGER NOT NULL,
state INTEGER NOT NULL,
PRIMARY KEY (rid, uuid))
""")
......@@ -203,9 +203,7 @@ class SQLiteDatabaseManager(DatabaseManager):
return -1
def getPartitionTable(self):
return [(offset, util.bin(uuid), state)
for offset, uuid, state in self.query(
"SELECT rid, uuid, state FROM pt")]
return self.query("SELECT * FROM pt")
def _getLastTIDs(self, all=True):
p64 = util.p64
......@@ -275,7 +273,6 @@ class SQLiteDatabaseManager(DatabaseManager):
if reset:
q("DELETE FROM pt")
for offset, uuid, state in cell_list:
uuid = buffer(util.dump(uuid))
# TODO: this logic should move out of database manager
# add 'dropCells(cell_list)' to API and use one query
# WKRD: Why does SQLite need a statement journal file
......
......@@ -16,9 +16,8 @@
from neo.lib import logging
from neo.lib.handler import EventHandler
from neo.lib.util import dump
from neo.lib.exception import PrimaryFailure, OperationFailure
from neo.lib.protocol import NodeStates, NodeTypes
from neo.lib.protocol import uuid_str, NodeStates, NodeTypes
class BaseMasterHandler(EventHandler):
......@@ -55,8 +54,8 @@ class BaseMasterHandler(EventHandler):
elif state == NodeStates.HIDDEN:
raise OperationFailure
elif node_type == NodeTypes.CLIENT and state != NodeStates.RUNNING:
logging.info('Notified of non-running client, abort (%r)',
dump(uuid))
logging.info('Notified of non-running client, abort (%s)',
uuid_str(uuid))
self.app.tm.abortFor(uuid)
def answerUnfinishedTransactions(self, conn, *args, **kw):
......
......@@ -16,9 +16,8 @@
from neo.lib import logging
from neo.lib.handler import EventHandler
from neo.lib.protocol import NodeTypes, Packets, NotReadyError
from neo.lib.protocol import uuid_str, NodeTypes, NotReadyError, Packets
from neo.lib.protocol import ProtocolError, BrokenNodeDisallowedError
from neo.lib.util import dump
from .storage import StorageOperationHandler
from .client import ClientOperationHandler
......@@ -60,7 +59,7 @@ class IdentificationHandler(EventHandler):
elif node_type == NodeTypes.STORAGE:
if node is None:
logging.error('reject an unknown storage node %s',
dump(uuid))
uuid_str(uuid))
raise NotReadyError
handler = StorageOperationHandler
else:
......
......@@ -17,7 +17,7 @@
from time import time
from neo.lib import logging
from neo.lib.util import dump
from neo.lib.protocol import ZERO_TID
from neo.lib.protocol import uuid_str, ZERO_TID
class ConflictError(Exception):
"""
......@@ -55,15 +55,14 @@ class Transaction(object):
self._checked_set = set()
def __repr__(self):
return "<%s(ttid=%r, tid=%r, uuid=%r, locked=%r, age=%.2fs)> at %x" % (