Commit f2796d9c authored by Julien Muchembled's avatar Julien Muchembled

Replace --prune-orphan storage option with a command that can be used in RUNNING state

parents fd007f5d ccbf7bce
......@@ -71,6 +71,7 @@ class AdminEventHandler(EventHandler):
setNodeState = forward_ask(Packets.SetNodeState)
checkReplicas = forward_ask(Packets.CheckReplicas)
truncate = forward_ask(Packets.Truncate)
repair = forward_ask(Packets.Repair)
class MasterEventHandler(EventHandler):
......
......@@ -130,10 +130,6 @@ class ConfigurationManager(object):
# only from command line
return self.argument_list.get('reset', False)
def getPruneOrphan(self):
# only from command line
return self.argument_list.get('prune_orphan', False)
def getUUID(self):
# only from command line
uuid = self.argument_list.get('uuid', None)
......
......@@ -14,7 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os, thread
import os
from time import time
from select import epoll, EPOLLIN, EPOLLOUT, EPOLLERR, EPOLLHUP
from errno import EAGAIN, EEXIST, EINTR, ENOENT
......@@ -35,7 +35,6 @@ class EpollEventManager(object):
"""This class manages connections and events based on epoll(5)."""
_timeout = None
_trigger_exit = False
def __init__(self):
self.connection_dict = {}
......@@ -43,6 +42,7 @@ class EpollEventManager(object):
self.writer_set = set()
self.epoll = epoll()
self._pending_processing = []
self._trigger_list = []
self._trigger_fd, w = os.pipe()
os.close(w)
self._trigger_lock = Lock()
......@@ -231,9 +231,12 @@ class EpollEventManager(object):
if fd == self._trigger_fd:
with self._trigger_lock:
self.epoll.unregister(fd)
if self._trigger_exit:
del self._trigger_exit
thread.exit()
action_list = self._trigger_list
try:
while action_list:
action_list.pop(0)()
finally:
del action_list[:]
continue
if conn.readable():
self._addPendingConnection(conn)
......@@ -253,9 +256,9 @@ class EpollEventManager(object):
def setTimeout(self, *args):
self._timeout, self._on_timeout = args
def wakeup(self, exit=False):
def wakeup(self, *actions):
with self._trigger_lock:
self._trigger_exit |= exit
self._trigger_list += actions
try:
self.epoll.register(self._trigger_fd)
except IOError, e:
......
......@@ -20,7 +20,7 @@ import traceback
from cStringIO import StringIO
from struct import Struct
PROTOCOL_VERSION = 8
PROTOCOL_VERSION = 9
# Size restrictions.
MIN_PACKET_SIZE = 10
......@@ -1175,6 +1175,25 @@ class SetClusterState(Packet):
_answer = Error
class Repair(Packet):
"""
Ask storage nodes to repair their databases. ctl -> A -> M
"""
_flags = map(PBoolean, ('dry_run',
# 'prune_orphan' (commented because it's the only option for the moment)
))
_fmt = PStruct('repair',
PFUUIDList,
*_flags)
_answer = Error
class RepairOne(Packet):
"""
See Repair. M -> S
"""
_fmt = PStruct('repair', *Repair._flags)
class ClusterInformation(Packet):
"""
Notify information about the cluster
......@@ -1684,6 +1703,10 @@ class Packets(dict):
TweakPartitionTable, ignore_when_closed=False)
SetClusterState = register(
SetClusterState, ignore_when_closed=False)
Repair = register(
Repair)
NotifyRepair = register(
RepairOne)
NotifyClusterInformation = register(
ClusterInformation)
AskClusterState, AnswerClusterState = register(
......
......@@ -14,7 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import threading, weakref
import thread, threading, weakref
from . import logging
from .app import BaseApplication
from .connection import ConnectionClosed
......@@ -69,7 +69,7 @@ class ThreadedApplication(BaseApplication):
conn.close()
# Stop polling thread
logging.debug('Stopping %s', self.poll_thread)
self.em.wakeup(True)
self.em.wakeup(thread.exit)
else:
super(ThreadedApplication, self).close()
......
......@@ -147,6 +147,19 @@ class AdministrationHandler(MasterHandler):
logging.warning('No node added')
conn.answer(Errors.Ack('No node added'))
def repair(self, conn, uuid_list, *args):
getByUUID = self.app.nm.getByUUID
node_list = []
for uuid in uuid_list:
node = getByUUID(uuid)
if node is None or not (node.isStorage() and node.isIdentified()):
raise ProtocolError("invalid storage node %s" % uuid_str(uuid))
node_list.append(node)
repair = Packets.NotifyRepair(*args)
for node in node_list:
node.notify(repair)
conn.answer(Errors.Ack(''))
def tweakPartitionTable(self, conn, uuid_list):
app = self.app
state = app.getClusterState()
......
......@@ -36,6 +36,7 @@ action_dict = {
'tweak': 'tweakPartitionTable',
'drop': 'dropNode',
'kill': 'killNode',
'prune_orphan': 'pruneOrphan',
'truncate': 'truncate',
}
......@@ -146,20 +147,20 @@ class TerminalNeoCTL(object):
assert len(params) == 0
return self.neoctl.startCluster()
def _getStorageList(self, params):
if len(params) == 1 and params[0] == 'all':
node_list = self.neoctl.getNodeList(NodeTypes.STORAGE)
return [node[2] for node in node_list]
return map(self.asNode, params)
def enableStorageList(self, params):
"""
Enable cluster to make use of pending storages.
Parameters: all
node [node [...]]
node: if "all", add all pending storage nodes.
Parameters: node [node [...]]
node: if "all", add all pending storage nodes,
otherwise, the list of storage nodes to enable.
"""
if len(params) == 1 and params[0] == 'all':
node_list = self.neoctl.getNodeList(NodeTypes.STORAGE)
uuid_list = [node[2] for node in node_list]
else:
uuid_list = map(self.asNode, params)
return self.neoctl.enableStorageList(uuid_list)
return self.neoctl.enableStorageList(self._getStorageList(params))
def tweakPartitionTable(self, params):
"""
......@@ -189,6 +190,20 @@ class TerminalNeoCTL(object):
"""
return uuid_str(self.neoctl.getPrimary())
def pruneOrphan(self, params):
"""
Fix database by deleting unreferenced raw data
This can take a long time.
Parameters: dry_run node [node [...]]
dry_run: 0 or 1
node: if "all", ask all connected storage nodes to repair,
otherwise, only the given list of storage nodes.
"""
dry_run = "01".index(params.pop(0))
return self.neoctl.repair(self._getStorageList(params), dry_run)
def truncate(self, params):
"""
Truncate the database at the given tid.
......
......@@ -172,6 +172,12 @@ class NeoCTL(BaseApplication):
raise RuntimeError(response)
return response[1]
def repair(self, *args):
response = self.__ask(Packets.Repair(*args))
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
def truncate(self, tid):
response = self.__ask(Packets.Truncate(tid))
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
......
......@@ -30,8 +30,6 @@ parser.add_option('-d', '--database', help = 'database connections string')
parser.add_option('-e', '--engine', help = 'database engine')
parser.add_option('-w', '--wait', help='seconds to wait for backend to be '
'available, before erroring-out (-1 = infinite)', type='float', default=0)
parser.add_option('--prune-orphan', action='store_true', help='fix database'
' by deleting unreferenced raw data, and exit (this can take a long time)')
parser.add_option('--reset', action='store_true',
help='remove an existing database if any, and exit')
......@@ -55,7 +53,5 @@ def main(args=None):
# and then, load and run the application
from neo.storage.app import Application
app = Application(config)
if config.getPruneOrphan():
print app.dm.pruneOrphan(), 'deleted record(s)'
elif not config.getReset():
if not config.getReset():
app.run()
......@@ -14,7 +14,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import threading
from collections import defaultdict
from contextlib import contextmanager
from functools import wraps
from neo.lib import logging, util
from neo.lib.exception import DatabaseFailure
......@@ -53,6 +55,7 @@ class DatabaseManager(object):
ENGINES = ()
_deferred = 0
_duplicating = _repairing = None
def __init__(self, database, engine=None, wait=0):
"""
......@@ -71,11 +74,27 @@ class DatabaseManager(object):
if attr == "_getPartition":
np = self.getNumPartitions()
value = lambda x: x % np
else:
elif self._duplicating is None:
return self.__getattribute__(attr)
else:
value = getattr(self._duplicating, attr)
setattr(self, attr, value)
return value
@contextmanager
def _duplicate(self):
cls = self.__class__
db = cls.__new__(cls)
db._duplicating = self
try:
db._connect()
finally:
del db._duplicating
try:
yield db
finally:
db.close()
@abstract
def _parse(self, database):
"""Called during instantiation, to process database parameter."""
......@@ -424,11 +443,6 @@ class DatabaseManager(object):
aborted before vote. This method is used to reclaim the wasted space.
"""
def pruneOrphan(self):
n = self._pruneData(self.getOrphanList())
self.commit()
return n
@abstract
def _pruneData(self, data_id_list):
"""To be overridden by the backend to delete any unreferenced data
......@@ -604,6 +618,37 @@ class DatabaseManager(object):
self._setTruncateTID(None)
self.commit()
def repair(self, weak_app, dry_run):
t = self._repairing
if t and t.is_alive():
logging.error('already repairing')
return
def repair():
l = threading.Lock()
l.acquire()
def finalize():
try:
if data_id_list and not dry_run:
self.commit()
logging.info("repair: deleted %s orphan records",
self._pruneData(data_id_list))
self.commit()
finally:
l.release()
try:
with self._duplicate() as db:
data_id_list = db.getOrphanList()
logging.info("repair: found %s records that may be orphan",
len(data_id_list))
weak_app().em.wakeup(finalize)
l.acquire()
finally:
del self._repairing
logging.info("repair: done")
t = self._repairing = threading.Thread(target=repair)
t.daemon = 1
t.start()
@abstract
def getTransaction(self, tid, all = False):
"""Return a tuple of the list of OIDs, user information,
......
......@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import weakref
from neo.lib import logging
from neo.lib.handler import EventHandler
from neo.lib.exception import PrimaryFailure, StoppedOperation
......@@ -59,3 +60,7 @@ class BaseMasterHandler(EventHandler):
def askFinalTID(self, conn, ttid):
conn.answer(Packets.AnswerFinalTID(self.app.dm.getFinalTID(ttid)))
def notifyRepair(self, conn, *args):
app = self.app
app.dm.repair(weakref.ref(app), *args)
......@@ -367,7 +367,7 @@ class ServerNode(Node):
raise ConnectorException
def stop(self):
self.em.wakeup(True)
self.em.wakeup(thread.exit)
class AdminApplication(ServerNode, neo.admin.app.Application):
pass
......
......@@ -17,6 +17,7 @@
import os
import sys
import threading
import time
import transaction
import unittest
from thread import get_ident
......@@ -1424,6 +1425,39 @@ class Test(NEOThreadedTest):
finally:
cluster.stop()
def testPruneOrphan(self):
cluster = NEOCluster(storage_count=2, partitions=2)
try:
cluster.start()
cluster.importZODB()(3)
bad = []
ok = []
def data_args(value):
return makeChecksum(value), value, 0
node_list = []
for i, s in enumerate(cluster.storage_list):
node_list.append(s.uuid)
if i:
s.dm.holdData(*data_args('boo'))
ok.append(s.getDataLockInfo())
for i in xrange(3 - i):
s.dm.storeData(*data_args('!' * i))
bad.append(s.getDataLockInfo())
s.dm.commit()
def check(dry_run, expected):
cluster.neoctl.repair(node_list, dry_run)
for e, s in zip(expected, cluster.storage_list):
while 1:
self.tic()
if s.dm._repairing is None:
break
time.sleep(.1)
self.assertEqual(e, s.getDataLockInfo())
check(1, bad)
check(0, ok)
check(1, ok)
finally:
cluster.stop()
if __name__ == "__main__":
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment