Commit cad754f0 authored by Vincent Pelletier's avatar Vincent Pelletier

Initial import of TIDStorage product.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@24513 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 49878f7d
############################################################################
#
# Copyright (c) 2007, 2008 Nexedi SARL and Contributors. All Rights Reserved.
# Vincent Pelletier <vincent@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
class ClientDisconnected(Exception):
pass
class ExchangeProtocol:
"""
Handle data exchange between client and server.
Kinds of data which can be exchanged:
- str
send_field
recv_field
- int
send_int
recv_int
- list of str
send_list
recv_list
- list of int
send_int_list
recv_int_list
- dict (key: str, value: int)
send_dict
recv_dict
Forbidden chars:
Send (raise if present):
\\n (field separator)
Receive (stripped silently):
\\n (field separator)
\\r (for compatibility)
"""
def __init__(self, socket):
self._socket = socket
def send_field(self, to_send):
if type(to_send) is not str:
raise ValueError, 'Value is not of str type: %r' % (type(to_send), )
if '\n' in to_send:
raise ValueError, '\\n is a forbidden value.'
self._socket.send(to_send)
self._socket.send('\n')
def recv_field(self):
received = None
result = []
append = result.append
while received != '\n':
received = self._socket.recv(1)
if len(received) == 0:
raise ClientDisconnected
if received != '\r':
append(received)
return ''.join(result[:-1])
def send_int(self, to_send):
self.send_field(str(to_send))
def recv_int(self):
return int(self.recv_field())
def send_list(self, to_send, send_length=True):
assert isinstance(to_send, (tuple, list))
if send_length:
self.send_int(len(to_send))
for field in to_send:
self.send_field(field)
def send_int_list(self, to_send, *args, **kw):
self.send_list([str(x) for x in to_send], *args, **kw)
def recv_list(self, length=None):
result = []
append = result.append
if length is None:
length = int(self.recv_field())
for field_number in xrange(length):
append(self.recv_field())
return result
def recv_int_list(self, *args, **kw):
return [int(x) for x in self.recv_list(*args, **kw)]
def send_dict(self, to_send):
"""
Key: string
Value: int
"""
assert isinstance(to_send, (dict))
if len(to_send) == 0:
key_list = value_list = []
else:
key_list, value_list = zip(*to_send.items())
self.send_list(key_list)
self.send_int_list(value_list, send_length=False)
def recv_dict(self):
"""
Key: string
Value: int
"""
key_list = self.recv_list()
value_list = self.recv_int_list(len(key_list))
result = dict(zip(key_list, value_list))
return result
1) Protocole:
Tous caractères autorisés dans les données, à l'exception de \n et \r.
Tout champ se termine par un \n (\r ignoré).
Pas d'échappement.
Lors de transfert de listes, la liste est précédée par le nombre de champs qu'elle contient.
Ex:
3\n
foo\n
bar\n
baz\n
2) Commande de début de commit:
BEGIN\n
<identifiant du commit>\n
<liste des storages concernés>
<identifiant du commit>: doit être identique à celui fourni à la fin de l'opération (que ça soit un ABORT ou un COMMIT)
<liste des storages concernés>: liste des identifiants des storages concernés par le commit
NB: la liste se termine par un \n, il n'est donc pas répété ici.
Réponse: (rien)
3) Commande d'annulation de la transaction:
ABORT\n
<identifiant du commit>\n
<identifiant du commit>: (cf. BEGIN)
Réponse: (rien)
4) Commande de finalisation de la transaction:
COMMIT\n
<identifiant du commit>\n
<liste des storages concernés>
<liste des TIDs commités>
<identifiant du commit>: (cf. BEGIN)
<liste des storages concernés>: (cf. BEGIN)
<liste des TIDs commités>: De même longueur que la liste des storages concernés. L'ordre doit corresponde à cette dernière.
NB: la liste se termine par un \n, il n'est donc pas répété ici.
Réponse: (rien)
5) Commande de lecture des données:
DUMP\n
Réponse:
<liste des storages>
<liste des TIDs>
############################################################################
#
# Copyright (c) 2007 Nexedi SARL and Contributors. All Rights Reserved.
# Vincent Pelletier <vincent@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from ZEO.ClientStorage import ClientStorage
LAST_COMMITED_TID_PROPERTY_ID = '_last_commited_tid'
# Hook tpc_finish's hook method.
# New hook must be a local method because it must access tpc_finish's "self"
# and original hook.
original_tpc_finish = ClientStorage.tpc_finish
def tpc_finish(self, txn, f=None):
def saveTIDOnInstance(tid):
if f is not None:
f(tid)
setattr(self, LAST_COMMITED_TID_PROPERTY_ID, tid)
return original_tpc_finish(self, txn, f=saveTIDOnInstance)
ClientStorage.tpc_finish = tpc_finish
def getLastCommitedTID(self):
"""
Return last commited tid for this storage, or None if no transaction
was commited yet.
"""
return getattr(self, LAST_COMMITED_TID_PROPERTY_ID, None)
ClientStorage.getLastCommitedTID = getLastCommitedTID
############################################################################
#
# Copyright (c) 2007, 2008 Nexedi SARL and Contributors. All Rights Reserved.
# Vincent Pelletier <vincent@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
# Load monkey patches
import transaction_transaction
import ZEOClientStorage
#!/usr/bin/python
##############################################################################
#
# Copyright (c) 2007, 2008 Nexedi SARL and Contributors. All Rights Reserved.
# Vincent Pelletier <vincent@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
# About errors in TIDStorage logs:
# - CRITICAL: Decreasing update ignored
# This error means that any backup started prior to this error can contain
# incomplete transaction data.
# This error can happen when TIDStorage did not handle received data in the
# right order.
# Example:
# 3 storages (S1, S2, S3):
# They all start at TID=1 value.
# 2 transaction (T1, T2):
# T1 commits TID 3 on S2, TID 2 on S3
# T2 commits TID 2 on S1, TID 2 on S2
# Due to those TIDs, TIDStorage *should* handle data in this order:
# T2begin, T2commit, T1begin, T1commit
# Or:
# T2begin, T1begin, T2commit, T1commit
# Or even, though it denotes a late handling of T2commit:
# T2begin, T1begin, T1commit, T2commit
# But, if TIDStorage handles data in the following order:
# T1begin, T1commit, T2begin, T2commit
# *AND* a backup dumps TIDStorage content at a point between T1commit and
# T2commit, then the backup will contain T2's commit on S2, which has a
# lower TID than T1's commit on that same storage.
#
# - Abort received, but never began
# and
# - Commit received, but never began
# These erros means that packets were lost/never received.
# This should not happen, since network connection is TCP, and TCP
# retransmits data.
# But it happens frequently if TIDStorage is started when Zope is under
# load. This is because Zope attemped to contact TIDStorage at "begin"
# step, but could not reach it. Then, at commit (or abort) it could reach
# TIDStorage, causing the error message.
# This error is bening, because:
# - Until bootstrap is complete, no TID is available for backup
# - When bootstrap is complete, it means that every ZODB got unlocked
# at some point (since TIDStorage commit happens after ZODB tpc_finish
# lock release).
# - When a transaction sends data to multiple ZODBs, there is a point in
# time when ALL impacted ZODBs are locked.
# The conclusion of all this is that any transaction started before
# TIDStorage was available has necesarily finished (commit or abort) at
# the time bootstrap finished.
# So no backup can be affected by such message (but backup feasability can
# get delayed as locks would delay bootstrap end, and hence TID data
# availability).
import os
import imp
import sys
import pwd
import grp
import sets
import time
import urllib
import socket
import signal
import getopt
import SocketServer
import threading
import traceback
from ExchangeProtocol import ClientDisconnected, ExchangeProtocol
class TransactionException(Exception):
pass
class AlwaysIncreasingDict(dict):
"""
When inserting/updating a value, check that the new one is strictly
greater than existing key (or integer 0 value if no value existed for
given key).
Values are converted to integers before comparison.
TODO:
- Do not descend from dict to prevent users from avoiding checks.
"""
def __init__(self, strict=False, *args, **kw):
dict.__init__(self, *args, **kw)
self._strict = strict
def __setitem__(self, key, value):
if self.get(key, 0) < value:
dict.__setitem__(self, key, value)
else:
if self._strict:
log('CRITICAL: Decreasing update ignored: key=%r %r <= %r' % \
(key, value, self.get(key, 0)))
def update(self, other):
"""
To check for decreases.
"""
for key, value in other.iteritems():
self[key] = value
class TransactionTracker:
"""
Implement transaction tracking.
This class is not thread-safe.
A transaction starts with a call to "begin" and ends with a call to
"finish" with the same identifier.
"finish" returns payload provided at begin (or True if no payload was
given) if nothing illegal was detected, otherwise returns False.
Illegal cases:
- "begin" called twice without intermediate "finish" call
- "finish" called without a corresponding "begin" call (this includes
calling "finish" twice)
"""
def __init__(self):
self._container = {}
def begin(self, identifier, payload=True):
if identifier in self._container:
raise TransactionException, 'Begin called twice in a row.'
self._container[identifier] = payload
def finish(self, identifier):
if identifier not in self._container:
raise TransactionException, 'Finish called without former "begin" call.'
return self._container.pop(identifier)
class TIDServer(SocketServer.BaseRequestHandler):
"""
Exchange data with connected peer.
TODO:
- Implement socket buffering.
"""
def log(self, message):
log('%r: %s' % (self.client_address, message))
def dump(self):
tid_dict = self._tid_storage.dump()
self._field_exchange.send_dict(tid_dict)
def begin(self):
identifier = self._field_exchange.recv_field()
storage_id_list = self._field_exchange.recv_list()
self._tid_storage.begin(identifier, storage_id_list)
def abort(self):
identifier = self._field_exchange.recv_field()
self._tid_storage.abort(identifier)
def commit(self):
identifier = self._field_exchange.recv_field()
tid_dict = self._field_exchange.recv_dict()
self._tid_storage.commit(identifier, tid_dict)
def handle(self):
global tid_storage
self._tid_storage = tid_storage
self._field_exchange = ExchangeProtocol(socket=self.request)
command_mapping = {
'begin': self.begin,
'abort': self.abort,
'commit': self.commit,
'dump': self.dump
}
self.log('Connected')
try:
# Intercept ClientDisconnected exception to stop thread nicely instead
# of crashing.
# Log all others exceptions.
while True:
received = self._field_exchange.recv_field()
command_str = received.lower()
if command_str == 'quit':
break
method = command_mapping.get(command_str)
if method is not None:
# Intercept all errors to log it instead of causing disconnection.
# Except, of course, the ClientDisconnected exception itself.
try:
method()
except ClientDisconnected:
raise
except:
self.log('\n'.join(traceback.format_exception(*sys.exc_info())))
except ClientDisconnected:
pass
except:
self.log('\n'.join(traceback.format_exception(*sys.exc_info())))
self.log('Client disconnected')
self.request.shutdown(socket.SHUT_RDWR)
self.request.close()
return
class TIDStorage:
"""
Store ZODB TIDs for multiple ZODBs.
Designed to be a singleton.
Thread-safe.
Consequently, transactions are not bound to a specific connection: If a
connection is cut after a "begin", reconnecting and issuing "abort" or
"commit" is valid.
TODO:
- Use smaller locking areas
- Improve decision taking algorythm in _unregisterTransactionID (implies
modifying _registerTransactionIDAndStorageID).
"""
_storage_id_lock = threading.RLock()
_next_full_dump = None
_next_dump = None
_tid_file = None
_burst_period = None
_full_dump_period = None
def __init__(self, tid_file_path=None, burst_period=None, full_dump_period=None):
self._transaction_tracker = TransactionTracker()
self._storage = AlwaysIncreasingDict(strict=True)
self._transcient = AlwaysIncreasingDict()
self._storage_id_to_transaction_id_list_dict = {}
self._transaction_id_to_storage_id_list_dict = {}
self._storage_id_to_storage_id_set_dict = {}
if tid_file_path is not None:
self._tid_file = LogFile(tid_file_path)
self._burst_period = burst_period
self._full_dump_period = full_dump_period
now = time.time()
if full_dump_period is not None:
self._next_full_dump = now
if burst_period is not None:
self._next_dump = now
self._since_last_burst = sets.Set()
def __repr__(self):
result = []
append = result.append
self._storage_id_lock.acquire()
try:
append('_storage_id_to_transaction_id_list_dict=' + \
repr(self._storage_id_to_transaction_id_list_dict))
append('_transaction_id_to_storage_id_list_dict=' + \
repr(self._transaction_id_to_storage_id_list_dict))
append('_storage_id_to_storage_id_set_dict=' + \
repr(self._storage_id_to_storage_id_set_dict))
append('_transcient=' + repr(self._transcient))
append('_storage=' + repr(self._storage))
finally:
self._storage_id_lock.release()
return '\n'.join(result)
def _registerTransactionIDAndStorageID(self, transaction_id, storage_id_list):
assert len(storage_id_list) != 0
assert self._storage_id_lock.acquire(False)
try:
# Update transaction_id -> storage_id_list
assert transaction_id not in self._transaction_id_to_storage_id_list_dict
self._transaction_id_to_storage_id_list_dict[transaction_id] = storage_id_list
storage_id_set = sets.Set(storage_id_list)
storage_id_set_id_set = sets.Set()
for storage_id in storage_id_list:
# Update storage_id -> transaction_id_list
identifier_set = self._storage_id_to_transaction_id_list_dict.get(storage_id)
if identifier_set is None:
identifier_set = self._storage_id_to_transaction_id_list_dict[storage_id] = sets.Set()
assert transaction_id not in identifier_set
identifier_set.add(transaction_id)
# Prepare the update storage_id -> storage_id_set
existing_storage_id_set = self._storage_id_to_storage_id_set_dict.get(storage_id, None)
if existing_storage_id_set is not None:
storage_id_set.union_update(existing_storage_id_set)
storage_id_set_id_set.add(id(existing_storage_id_set))
# Update storage_id -> storage_id_set
# Cannot use iteritems because dict is modified in the loop.
for key, value in self._storage_id_to_storage_id_set_dict.items():
if id(value) in storage_id_set_id_set:
self._storage_id_to_storage_id_set_dict[key] = storage_id_set
for storage_id in storage_id_set:
self._storage_id_to_storage_id_set_dict[storage_id] = storage_id_set
finally:
self._storage_id_lock.release()
def _unregisterTransactionID(self, transaction_id):
"""
Also transfers from self._transcient to self._storage.
"""
assert self._storage_id_lock.acquire(False)
try:
# Update transaction_id -> storage_id_list and retrieve storage_id_list
# Raises if not found
storage_id_list = self._transaction_id_to_storage_id_list_dict.pop(transaction_id)
# Update storage_id -> transaction_id_list
for storage_id in storage_id_list:
identifier_set = self._storage_id_to_transaction_id_list_dict[storage_id]
# Raises if not found
identifier_set.remove(transaction_id)
if len(identifier_set) == 0:
del self._storage_id_to_transaction_id_list_dict[storage_id]
# Update storage_id -> storage_id_set
# Raises if not found
storage_id_set = self._storage_id_to_storage_id_set_dict[storage_id]
# Raises if not found
storage_id_set.remove(storage_id)
if self._tid_file is not None:
now = time.time()
can_full_dump = has_bootstraped and (self._next_full_dump is not None) and (self._next_full_dump < now)
can_dump = (not can_full_dump) and (self._next_dump is not None) and (self._next_dump < now)
record_for_dump = can_dump or (self._next_dump is not None)
append_to_file = has_bootstraped and (can_dump or can_full_dump)
else:
append_to_file = record_for_dump = can_dump = can_full_dump = False
for key, value in self._storage_id_to_storage_id_set_dict.iteritems():
if len(value) == 0 and key in self._transcient:
self._storage[key] = self._transcient.pop(key)
if record_for_dump:
self._since_last_burst.add(key)
if append_to_file:
if can_full_dump:
to_dump_dict = self._storage
dump_code = 'f'
else:
to_dump_dict = dict([(key, self._storage[key]) for key in self._since_last_burst])
dump_code = 'd'
if len(to_dump_dict):
self._tid_file.write('%.02f %s %r\n' % (now, dump_code, to_dump_dict))
if can_full_dump:
self._next_full_dump = now + self._full_dump_period
if self._next_dump is not None:
self._next_dump = now + self._burst_period
self._since_last_burst.clear()
if not has_bootstraped:
doBootstrap()
#if len(self._storage_id_to_transaction_id_list_dict) == 0:
# self._storage.update(self._transcient)
# self._transcient.clear()
finally:
self._storage_id_lock.release()
def dump(self):
self._storage_id_lock.acquire()
try:
return self._storage.copy()
finally:
self._storage_id_lock.release()
def begin(self, transaction_id, storage_id_list):
self._storage_id_lock.acquire()
try:
self._transaction_tracker.begin(transaction_id, storage_id_list)
self._registerTransactionIDAndStorageID(transaction_id, storage_id_list)
finally:
self._storage_id_lock.release()
def abort(self, transaction_id):
self._storage_id_lock.acquire()
try:
try:
self._transaction_tracker.finish(transaction_id)
except TransactionException:
# Overwrite exception message
raise TransactionException, 'Abort received, but never began'
self._unregisterTransactionID(transaction_id)
finally:
self._storage_id_lock.release()
def commit(self, transaction_id, tid_dict):
self._storage_id_lock.acquire()
try:
try:
storage_id_list = self._transaction_tracker.finish(transaction_id)
except TransactionException:
# Overwrite exception message
raise TransactionException, 'Commit received, but never began'
check_dict = tid_dict.copy()
for storage_id in storage_id_list:
del check_dict[storage_id]
assert len(check_dict) == 0
self._transcient.update(tid_dict)
self._unregisterTransactionID(transaction_id)
finally:
self._storage_id_lock.release()
class BootstrapContent(threading.Thread):
"""
Thread used to bootstrap TIDStorage content.
This must be started at first client request, and must be run only once.
Global boolean "has_bootstraped" is set to true once it succeeded.
"""
def __init__(self, *args, **kw):
threading.Thread.__init__(self, *args, **kw)
self.setDaemon(True)
def run(self):
"""
Contact all zopes to serialize all their storages.
"""
global has_bootstraped
base_url = options.base_url
if base_url is not None:
storage_id_to_object_path_dict = dict([(key, value[2]) for key, value
in options.known_tid_storage_identifier_dict.iteritems()
if value[2] is not None])
target_storage_id_set = sets.ImmutableSet(storage_id_to_object_path_dict.keys())
known_storage_id_set = sets.ImmutableSet(tid_storage.dump().keys())
to_check_storage_id_set = target_storage_id_set - known_storage_id_set
while len(to_check_storage_id_set) and can_bootstrap:
serialize_url = None
for storage_id in to_check_storage_id_set:
if can_bootstrap and storage_id not in tid_storage.dump().keys():
serialize_url = base_url % (storage_id_to_object_path_dict[storage_id], )
try:
# Query a Zope, which will contact this process in return to store
# the new TID number, making the given storage known.
page = urllib.urlopen(serialize_url)
except Exception, message:
log('Exception during bootstrap (%r):\n%s' % (serialize_url, ''.join(traceback.format_exception(*sys.exc_info()))))
else:
log('Opened %r: %r' % (serialize_url, page.read()))
# Let some time for zope to contact TIDStorage back and fill the gaps.
time.sleep(5)
known_storage_id_set = sets.ImmutableSet(tid_storage.dump().keys())
to_check_storage_id_set = target_storage_id_set - known_storage_id_set
if len(to_check_storage_id_set):
log('Bootstrap in progress... Mising storages: %r' % (to_check_storage_id_set, ))
# Retry a bit later
time.sleep(60)
if len(to_check_storage_id_set) == 0:
log('Bootstrap done (%i storages).' % (len(target_storage_id_set), ))
has_bootstraped = True
else:
has_bootstraped = True
bootstrap_content = BootstrapContent()
has_bootstraped = False
can_bootstrap = True
bootstrap_lock = threading.RLock()
def doBootstrap():
acquired = bootstrap_lock.acquire(False)
if acquired:
try:
if not bootstrap_content.isAlive():
bootstrap_content.start()
finally:
bootstrap_lock.release()
def log(message):
print >> sys.stdout, '%s: %s' % (time.asctime(), message)
class PoliteThreadingTCPServer(SocketServer.ThreadingTCPServer):
def server_close(self):
# Make the port reusable for listening as soon as the socket closes.
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
def main(address, port):
server = PoliteThreadingTCPServer((address, port), TIDServer)
try:
try:
log('Server listening.')
server.serve_forever()
except KeyboardInterrupt:
log('Shuting down (received KeyboardInterrupt).')
finally:
global can_bootstrap
can_bootstrap = False
log('Waiting for clients to disconnect...')
server.server_close()
log_file_set = sets.Set()
class LogFile:
"""
Loggin-to-file class.
Allows rotating file.
Can be used as stdout/stderr file: no write from any thread is lost during
rotation. There is an unavoidable (?) race condition if anything gets
raised by the rotating thread between "self._to_buffer = False" completion
and following log flush.
"""
_file = None
def __init__(self, file_name):
self._lock = threading.RLock()
self._file_name = file_name
self._to_buffer = False
self._buffer = []
self._open()
log_file_set.add(self)
def _open(self):
self._file = open(self._file_name, 'a', 0)
def write(self, value):
self._lock.acquire()
try:
if self._to_buffer:
self._buffer.append(value)
else:
self._file.write(value)
finally:
self._lock.release()
def close():
self._lock.acquire()
try:
log_file_set.remove(self)
self._file.close()
self._file = None
finally:
self._lock.release()
def rotate(self):
self._lock.acquire()
try:
self._to_buffer = True
self._file.close()
self._open()
# XXX: race condition below if rotating stderr: Any exception thrown
# here will be out-of-order in resulting log.
self._to_buffer = False
self.write(''.join(self._buffer))
# End of race condition.
finally:
self._lock.release()
def HUPHandler(signal_number, stack):
rotate_count = 0
log('Rotating logfiles...')
for log_file in log_file_set:
rotate_count += 1
log_file.rotate()
log('Logfiles rotated (%i).' % (rotate_count, ))
def USR1Handler(signal_number, stack):
log(repr(tid_storage))
def TERMHandler(signal_number, stack):
log('Received SIGTERM, exiting.')
raise KeyboardInterrupt, 'Killed by SIGTERM'
def usage():
print """
Usage: %(arg0)s [-h] [-n|--nofork|--fg] [-l|--log] [-p|--port] [-a|--address]
[--pidfile] [--user] [--group] [-s|--status-file] [-b|--burst-period]
[-F|--full-dump-period]
-h
Display this help.
-n
--nofork
--fg
Do not fork in background.
-l filename
--log filename
Log to given filename, instead of default %(logfile_name)s.
-p number
--port number
Listen to given port number, intead of default %(port)i.
-a address
--address address
Listen to interface runing given address, instead of default %(address)s.
--pidfile file_path
If forking, this file will contain the pid of forked process.
If this argument is not provided, pid is written to %(pidfile_name)s.
--user user_name
Run as specified user.
Also, retrieve user's group and run as this group.
--group group_name
Run as specified group.
If both --user and --group are specified, --group must come last.
-s file_name
--status-file file_name
Append stored TIDs to file.
See also "burst-period" and "full-dump-period".
If not provided, no dump ever happens.
-b seconds
--burst-period seconds
Defines the age of last write after which an incremental write can happen.
Such write only contain what changed since last write.
If not provided, no incremental write is done.
-F seconds
--full-dump-period seconds
How many seconds must separate complete dumps to status file.
Those writes contain the complete current state.
If both a full dump and an incremental write can happen, full dump takes
precedence.
If not provided, no full dump is done.
-c file_name
--config file_name
Use given file as options file.
It must be a python file. See sample_options.py for possible values.
If provided and if configuration file defines base_url and
known_tid_storage_identifier_dict variables, this program will cause
generation of all tids before first write to status file.
""" % {'arg0': sys.argv[0],
'logfile_name': Options.logfile_name,
'pidfile_name': Options.pidfile_name,
'port': Options.port,
'address': Options.address}
class Options:
port = 9001
address = '0.0.0.0'
logfile_name = 'tidstorage.log'
pidfile_name = 'tidstorage.pid'
fork = True
setuid = None
setgid = None
status_file = None
burst_period = None
full_dump_period = None
known_tid_storage_identifier_dict = {}
base_url = None
config_file_name = None
options = Options()
try:
opts, args = getopt.getopt(sys.argv[1:],
'hnfl:p:a:s:b:F:c:',
['help', 'nofork', 'fg', 'log=', 'port=',
'address=', 'pidfile=', 'user=', 'group=',
'status-file=', 'burst-period=',
'full-dump-period=', 'config='])
except:
usage()
raise
for opt, arg in opts:
if opt in ('-h', '--help'):
usage()
sys.exit()
elif opt in ('-n', '--fg', '--nofork'):
options.fork = False
elif opt in ('-l', '--log'):
options.logfile_name = arg
elif opt in ('-p', '--port'):
options.port = int(arg)
elif opt in ('-a', '--address'):
options.address = arg
elif opt == '--pidfile':
options.pidfile_name = arg
elif opt == '--user':
pw = pwd.getpwnam(arg)
options.setuid = pw.pw_uid
options.setgid = pw.pw_gid
elif opt == '--group':
options.setgid = grp.getgrnam(arg).gr_gid
elif opt in ('-s', '--status-file'):
options.status_file = arg
elif opt in ('-b', '--burst-period'):
options.burst_period = int(arg)
elif opt in ('-F', '--full-dump-period'):
options.full_dump_period = int(arg)
elif opt in ('-c', '--config'):
config_file_name = arg
if config_file_name is not None:
config_file = os.path.splitext(os.path.basename(config_file_name))[0]
config_path = os.path.dirname(config_file_name)
if len(config_path):
config_path = [config_path]
else:
config_path = sys.path
file, path, description = imp.find_module(config_file, config_path)
module = imp.load_module(config_file, file, path, description)
file.close()
for option_id in [x for x in dir(Options) if x[:1] != '_']:
if option_id not in options.__dict__ and hasattr(module, option_id):
setattr(options, option_id, getattr(module, option_id))
if options.logfile_name is not None:
options.logfile_name = os.path.abspath(options.logfile_name)
if options.status_file is not None:
options.status_file = os.path.abspath(options.status_file)
if options.setgid is not None:
os.setgid(options.setgid)
if options.setuid is not None:
os.setuid(options.setuid)
tid_storage = TIDStorage(tid_file_path=options.status_file,
burst_period=options.burst_period,
full_dump_period=options.full_dump_period)
signal.signal(signal.SIGHUP, HUPHandler)
signal.signal(signal.SIGUSR1, USR1Handler)
signal.signal(signal.SIGTERM, TERMHandler)
if options.fork:
pid = os.fork()
if pid == 0:
os.umask(027)
os.setsid()
pid = os.fork()
if pid == 0:
os.close(0)
os.close(1)
os.close(2)
os.open('/dev/null', os.O_RDWR)
os.dup2(0, 1)
os.dup2(0, 2)
sys.stdout = sys.stderr = LogFile(options.logfile_name)
os.chdir('/')
try:
main(options.address, options.port)
except:
# Log exception before returning.
log('Exception caught outside of "main". Previous log entries might ' \
'be out of order because of this exception.\n%s' % (
''.join(traceback.format_exception(*sys.exc_info())), ))
else:
log('Exited normaly.')
else:
pidfile = open(options.pidfile_name, 'w')
pidfile.write(str(pid))
pidfile.close()
os._exit(0)
else:
# TODO: monitor child startup to make it easier to use.
os._exit(0)
else:
main(options.address, options.port)
--- /home/vincent/bin/zope2.8/bin/repozo.py 2007-02-09 13:52:35.000000000 +0100
+++ repozo.py 2007-10-26 15:30:43.311046075 +0200
@@ -50,6 +50,12 @@
Compress with gzip the backup files. Uses the default zlib
compression level. By default, gzip compression is not used.
+ -m / --max-tid
+ Stop at given TID when saving the Data.fs.
+
+ -M / --print-max-tid
+ Print the last saved transaction's tid.
+
Options for -R/--recover:
-D str
--date=str
@@ -70,6 +76,7 @@
import time
import errno
import getopt
+import base64
from ZODB.FileStorage import FileStorage
@@ -104,10 +111,11 @@
def parseargs():
global VERBOSE
try:
- opts, args = getopt.getopt(sys.argv[1:], 'BRvhf:r:FD:o:Qz',
+ opts, args = getopt.getopt(sys.argv[1:], 'BRvhf:r:FD:o:Qzm:M',
['backup', 'recover', 'verbose', 'help',
'file=', 'repository=', 'full', 'date=',
- 'output=', 'quick', 'gzip'])
+ 'output=', 'quick', 'gzip', 'max-tid=',
+ 'print-max-tid'])
except getopt.error, msg:
usage(1, msg)
@@ -120,6 +128,8 @@
output = None # where to write recovered data; None = stdout
quick = False # -Q flag state
gzip = False # -z flag state
+ print_tid = False # -M flag state
+ max_tid = None # -m argument, if any
options = Options()
@@ -150,6 +160,10 @@
options.output = arg
elif opt in ('-z', '--gzip'):
options.gzip = True
+ elif opt in ('-M', '--print-max-tid'):
+ options.print_tid = True
+ elif opt in ('-m', '--max-tid'):
+ options.max_tid = base64.decodestring(arg)
else:
assert False, (opt, arg)
@@ -174,6 +188,12 @@
if options.file is not None:
log('--file option is ignored in recover mode')
options.file = None
+ if options.print_tid:
+ log('--print-max-tid is ignored in recover mode')
+ options.print_tid = False
+ if options.max_tid is not None:
+ log('--max-tid is ignored in recover mode')
+ options.max_tid = None
return options
@@ -349,13 +369,19 @@
def do_full_backup(options):
# Find the file position of the last completed transaction.
- fs = FileStorage(options.file, read_only=True)
+ fs = FileStorage(options.file, read_only=True, stop=options.max_tid)
# Note that the FileStorage ctor calls read_index() which scans the file
# and returns "the position just after the last valid transaction record".
# getSize() then returns this position, which is exactly what we want,
# because we only want to copy stuff from the beginning of the file to the
# last valid transaction record.
pos = fs.getSize()
+ if options.print_tid:
+ undo_log = fs.undoLog(last=-1)
+ if len(undo_log):
+ print >> sys.stdout, 'Last TID: %s' % (undo_log[0]['id'], )
+ else:
+ print >> sys.stderr, 'Cannot get latest TID'
fs.close()
options.full = True
dest = os.path.join(options.repository, gen_filename(options))
@@ -375,13 +401,19 @@
def do_incremental_backup(options, reposz, repofiles):
# Find the file position of the last completed transaction.
- fs = FileStorage(options.file, read_only=True)
+ fs = FileStorage(options.file, read_only=True, stop=options.max_tid)
# Note that the FileStorage ctor calls read_index() which scans the file
# and returns "the position just after the last valid transaction record".
# getSize() then returns this position, which is exactly what we want,
# because we only want to copy stuff from the beginning of the file to the
# last valid transaction record.
pos = fs.getSize()
+ if options.print_tid:
+ undo_log = fs.undoLog(last=-1)
+ if len(undo_log):
+ print >> sys.stdout, 'Last TID: %s' % (undo_log[0]['id'], )
+ else:
+ print >> sys.stderr, 'Cannot get latest TID'
fs.close()
options.full = False
dest = os.path.join(options.repository, gen_filename(options))
#!/usr/bin/python
##############################################################################
#
# Copyright (c) 2007 Nexedi SARL. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# Parts of this file are borrowed from Zope 2.8.8 repozo.py script.
# Essentialy "usage" and "parseargs" methods.
# So it's released under the ZPL v2.0, as is Zope 2.8.8 .
""" repozo wrapper to backup for multiple Data.fs files in a consistent way.
Usage: %(program)s [-h|--help] [-c|--config configuration_file]
[--repozo repozo_command] [-R|--recover|--recover_check]
[-H|--host address] [-p|--port port_number] [-u|--url formated_url]
[...]
-h
--help
Display this help and exit.
-c configuration_file
--config configuration_file
Use given file as configuration file.
It must be a python file. See sample_configuration.py for required values.
Recquired if neither -h nor --help are given.
--repozo repozo_command
Use given executable as repozo command.
Default: repozo.py
-R
--recover
Instead of saving existing Data.fs, perform an automated recovery from
backups + timestamp file.
--recover_check
Similar to above, except that it restores file to temp folder and compares
with existing file.
Files restored this way are automaticaly deleted after check.
-H address
--host address
TIDStorage server host address.
Overrides setting found in configuration_file.
Not required if recovering (see above).
-p port_number
--port port_number
TIDStorage port nuber.
Overrides setting found in configuration_file.
Not required if recovering (see above).
-u formated_url
--url formated_url
Zope base url, optionnaly with credentials.
Overrides setting found in configuration_file.
Not required if recovering (see above).
All others parameters are transmitted to repozo but are partly processed by
getopt. To transmit unprocessed parameters to repozo, pass them as an
argument.
"""
from ExchangeProtocol import ExchangeProtocol
import socket
import base64
import imp
import getopt
import sys
import os
# urllib2 does not support (?) urls containing credentials
# (http://login:password@...) but it's fine with urllib.
from urllib import urlopen
import traceback
import md5
import time
import tempfile
from struct import pack, unpack
program = sys.argv[0]
def log(message):
print message
class TIDClient:
def __init__(self, address):
# TODO: handle diconnections nicely
self._address = address
self._socket = socket.socket()
self._socket.connect(address)
self._protocol_handler = ExchangeProtocol(socket=self._socket)
def __call__(self):
"""
Return dict currently stored on the server.
"""
self._protocol_handler.send_field('dump')
return self._protocol_handler.recv_dict()
def backup(address, known_tid_storage_identifier_dict, repozo_formated_command, zope_formated_url=None):
connection = TIDClient(address)
to_load = known_tid_storage_identifier_dict.keys()
load_count = 2
while len(to_load):
if load_count < 1:
raise ValueError, 'It was impossible to retrieve all required TIDs. Missing: %s' (to_load, )
to_load = []
load_count -= 1
stored_tid_dict = connection()
#log(stored_tid_dict)
for key, (file_path, storage_path, object_path) in known_tid_storage_identifier_dict.iteritems():
if key not in stored_tid_dict and zope_formated_url is not None:
to_load.append(key)
if object_path is not None:
serialize_url = zope_formated_url % (object_path, )
log(serialize_url)
try:
response = urlopen(serialize_url)
except Exception, message:
# Prevent exceptions from interrupting the backup.
# We don't care about how well the web server is working, the only
# important thing is to get all TIDs in TIDStorage, and it's checked
# later.
log(''.join(traceback.format_exception(*sys.exc_info())))
backup_count = 0
total_count = len(known_tid_storage_identifier_dict)
for key, (file_path, storage_path, object_path) in known_tid_storage_identifier_dict.iteritems():
tid_as_int = stored_tid_dict[key] + 1
tid = base64.encodestring(pack('>Q', tid_as_int)).rstrip()
repozo_command = repozo_formated_command % (storage_path, file_path, tid)
if not os.access(storage_path, os.R_OK):
os.makedirs(storage_path)
log('Runing %r...' % (repozo_command, ))
status = os.system(repozo_command)
status = os.WEXITSTATUS(status)
if status == 0:
backup_count += 1
else:
log('Error occured while saving %s: exit status=%i' % (file_path, status))
log('Saved %i FileStorages out of %i.' % (backup_count, total_count))
return total_count - backup_count
def get_md5_diggest(file_instance, length):
BLOCK_SIZE=512
file_instance.seek(0)
md5sum = md5.new()
read = file_instance.read
update = md5sum.update
while length > 0:
to_read = min(BLOCK_SIZE, length)
buffer = read(to_read)
if len(buffer) != to_read:
log('Warning: read %i instead of requiested %i, stopping read' % (len(buffer), to_read))
length = 0
else:
length -= to_read
update(buffer)
return md5sum.hexdigest()
def recover(known_tid_storage_identifier_dict, repozo_formated_command, check=False):
recovered_count = 0
total_count = len(known_tid_storage_identifier_dict)
for key, (file_path, storage_path, object_path) in known_tid_storage_identifier_dict.iteritems():
if not os.access(storage_path, os.R_OK):
log('Warning: unable to recover %s because %s is missing/unreadable.' % (file_path, storage_path))
continue
if check:
original_file_path = file_path
file_path = os.path.join(tempfile.gettempdir(), os.path.basename(file_path))
repozo_command = repozo_formated_command % (storage_path, file_path)
status = os.system(repozo_command)
status = os.WEXITSTATUS(status)
if status == 0:
recovered_count += 1
else:
log('Error occured while recovering %s: exit status=%i' % (file_path, status))
if check:
log('Info: Comparing restored %s with original %s' % (file_path, original_file_path))
recovered_file = open(file_path, 'r')
original_file = open(original_file_path, 'r')
try:
recovered_file.seek(0, 2)
original_file.seek(0, 2)
recovered_file_length = recovered_file.tell()
original_file_length = original_file.tell()
checked_length = recovered_file_length
if recovered_file_length < original_file_length:
log('Info: Shorter than original: -%i bytes (-%.02f%%)' % \
(original_file_length - recovered_file_length,
1 - (float(recovered_file_length) / original_file_length)))
elif recovered_file_length > original_file_length:
log('ERROR: Longer than original: +%i bytes (+%.02f%%). Was original packed since backup ?' % \
(recovered_file_length - original_file_length,
float(recovered_file_length) / original_file_length))
checked_length = None
if checked_length is not None:
recovered_file_diggest = get_md5_diggest(recovered_file, checked_length)
original_file_diggest = get_md5_diggest(original_file, checked_length)
if recovered_file_diggest != original_file_diggest:
log('ERROR: Recovered md5 does not match original: %s != %s.' % \
(recovered_file_diggest, original_file_diggest))
finally:
recovered_file.close()
original_file.close()
os.unlink(file_path)
log('Restored %i FileStorages out of %i.' % (recovered_count, total_count))
return total_count - recovered_count
def usage(code, msg=''):
outfp = sys.stderr
if code == 0:
outfp = sys.stdout
print >> outfp, __doc__ % globals()
if msg:
print >> outfp, msg
sys.exit(code)
def parseargs():
try:
opts, args = getopt.getopt(sys.argv[1:], 'vQr:FhzMRc:H:p:u:',
['help', 'verbose', 'quick', 'full',
'gzip', 'print-max-tid', 'repository',
'repozo=', 'config=', 'host=', 'port=',
'url=', 'recover', 'recover_check'])
except getopt.error, msg:
usage(1, msg)
class Options:
timestamp_file_path = None
repozo_file_name = 'repozo.py'
configuration_file_name = None
repozo_opts = ['-B']
host = None
port = None
base_url = None
known_tid_storage_identifier_dict = {}
recover = False
dry_run = False
options = Options()
if args:
options.repozo_opts.extend(args)
for opt, arg in opts:
if opt in ('-h', '--help'):
usage(0)
elif opt in ('-c', '--config'):
options.configuration_file_name = arg
elif opt == '--repozo':
options.repozo_file_name = arg
elif opt in ('-R', '--recover', '--recover_check'):
options.repozo_opts[0] = '-R'
options.recover = True
if opt == '--recover_check':
options.dry_run = True
elif opt in ('-H', '--host'):
options.host = arg
elif opt in ('-p', '--port'):
try:
options.port = int(port)
except ValueError, msg:
usage(1, msg)
elif opt in ('-u', '--url'):
options.url = arg
elif opt in ('-r', '--repository'):
options.repozo_opts.append('%s %s' % (opt, arg))
else:
options.repozo_opts.append(opt)
if options.configuration_file_name is None:
usage(1, 'Either -c or --config is required.')
configuration_filename, ext = os.path.splitext(os.path.basename(options.configuration_file_name))
configuration_path = os.path.dirname(options.configuration_file_name)
if len(configuration_path):
configuration_path = [configuration_path]
else:
configuration_path = sys.path
file, path, description = imp.find_module(configuration_filename, configuration_path)
module = imp.load_module(configuration_filename, file, path, description)
file.close()
try:
options.known_tid_storage_identifier_dict = module.known_tid_storage_identifier_dict
options.timestamp_file_path = module.timestamp_file_path
except AttributeError, msg:
usage(1, msg)
for option_id in ('port', 'host', 'base_url'):
if getattr(options, option_id) is None:
setattr(options, option_id, getattr(module, option_id, None))
# XXX: we do not check any option this way, it's too dangerous.
#options.repozo_opts.extend(getattr(module, 'repozo_opts', []))
if options.port is None:
options.port = 9001
if options.host is None:
usage(1, 'Either -H or --host is required (or host value should be set in configuration file).')
return options
options = parseargs()
address = (options.host, options.port)
zope_formated_url = options.base_url
if options.base_url is not None and '%s' not in zope_formated_url:
raise ValueError, 'Given base url (%r) is not properly formated, it must contain one \'%%s\'.' % (zope_formated_url, )
repozo_formated_command = '%s %s -r "%%s"' % (options.repozo_file_name, ' '.join(options.repozo_opts))
if options.recover:
timestamp_file = open(options.timestamp_file_path, 'r')
timestamp = ''
read_line = ' '
while len(read_line):
timestamp = read_line
read_line = timestamp_file.readline()
timestamp = timestamp.strip('\r\n \t')
if timestamp is not None:
repozo_formated_command += ' -o "%%s" -D %s' % (timestamp, )
result = recover(
known_tid_storage_identifier_dict=options.known_tid_storage_identifier_dict,
repozo_formated_command=repozo_formated_command,
check=options.dry_run)
else:
repozo_formated_command += ' -f "%s" -m "%s"'
result = backup(
address=address,
known_tid_storage_identifier_dict=options.known_tid_storage_identifier_dict,
zope_formated_url=zope_formated_url,
repozo_formated_command=repozo_formated_command)
if result == 0:
# Paranoid mode:
# Issue a system-wide "sync" command to make sure all files which were saved
# are really present on disk.
os.system('sync')
timestamp_file = open(options.timestamp_file_path, 'a', 0)
try:
# Borrowed from repozo.
timestamp_file.write('\n%04d-%02d-%02d-%02d-%02d-%02d' % time.gmtime()[:6])
finally:
timestamp_file.close()
sys.exit(result)
#!/usr/bin/python
##############################################################################
#
# Copyright (c) 2007 Nexedi SARL. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# Parts of this file are borrowed from Zope 2.8.8 repozo.py script.
# Essentialy "usage", "parseargs" and parts of "restore" methods.
# So it's released under the ZPL v2.0, as is Zope 2.8.8 .
"""
Usage: %(program)s [-h|--help] [-c|--config configuration_file]
-h
--help
Display this help and exit.
-c configuration_file
--config configuration_file
Use given file as configuration file.
It must be a python file.
Recquired if neither -h nor --help are given.
"""
import imp
import getopt
import sys
import os
# urllib2 does not support (?) urls containing credentials
# (http://login:password@...) but it's fine with urllib.
from struct import pack
import shutil
from ZODB.FileStorage import FileStorage
program = sys.argv[0]
def log(message):
print message
def parse(status_file):
tid_log = open(status_file)
content = {}
last_timestamp = None
line = tid_log.readline()
while line != '':
split_line = line.split(' ', 2)
assert len(split_line) == 3, repr(split_line)
line_timestamp, line_type, line_dict = split_line
line_timestamp = float(line_timestamp)
assert line_type in ('f', 'd'), repr(line_type)
if last_timestamp is None:
last_timestamp = line_timestamp
else:
assert last_timestamp < line_timestamp, '%r < %r' % (last_timestamp, line_timestamp)
line_dict = eval(line_dict, None)
assert isinstance(line_dict, dict), type(line_dict)
assert len(line_dict), repr(line_dict)
if line_type == 'd':
for key, value in line_dict.iteritems():
if key in content:
assert content[key] < value, '%r < %r' % (content[key], value)
content[key] = value
elif line_type == 'f':
for key, value in content.iteritems():
assert key in line_dict, repr(key)
assert value <= line_dict[key], '%r <= %r' % (value, line_dict[key])
content = line_dict
line = tid_log.readline()
return content
READCHUNK = 10 * 1024 * 1024
def recover(data_fs_backup_path_dict, status_file):
last_tid_dict = parse(status_file)
for storage_id, (file_path, backup_path) in data_fs_backup_path_dict.iteritems():
# Derived from repozo (function=do_full_backup)
# TODO: optimise to read backup only once.
can_restore = False
if os.path.exists(backup_path):
if os.path.exists(file_path):
print 'Both original and backup files exist for %r. If previous restoration was successful, you should delete the backup for this restoration to take place. Original: %r Backup: %r' % (storage_id, file_path, backup_path)
else:
print 'Only backup file is available for %r: %r. Assuming it\'s ok and restoring to %r' % (storage_id, backup_path, file_path)
can_restore = True
else:
if os.path.exists(file_path):
sys.stdout.write('Copying %r to %r... ' % (file_path, backup_path))
shutil.copy(file_path, backup_path)
initial_size = stat(file_path).st_size
final_size = stat(backup_path).st_size
if initial_size == final_size:
can_restore = True
print 'Done.'
else:
print 'Backup size %i differs from original size %i. Is the original file (%r) still in use ? Is there enough free disk space at destination (%r) ?' % (final_size, initial_size, file_path, backup_path)
else:
print 'Cannot find any file for %r: %r and %r do not exist.' % (storage_id, file_path, backup_path)
if can_restore:
last_tid = last_tid_dict[storage_id] + 1
tid = pack('>Q', last_tid)
# Find the file position of the last completed transaction.
fs = FileStorage(backup_path, read_only=True, stop=tid)
# Note that the FileStorage ctor calls read_index() which scans the file
# and returns "the position just after the last valid transaction record".
# getSize() then returns this position, which is exactly what we want,
# because we only want to copy stuff from the beginning of the file to the
# last valid transaction record.
pos = fs.getSize()
fs.close()
print 'Restoring backup: %s bytes (transaction %r) from %s to %s' % (pos, tid, backup_path, file_path)
source_file = open(backup_path, 'rb')
destination_file = open(file_path, 'wb')
while pos:
todo = min(READCHUNK, pos)
data = source_file.read(todo)
if not data:
print 'Unexpected end of data stream (should contain %i more bytes)' % (pos, )
break
destination_file.write(data)
pos -= len(data)
destination_file.close()
source_file.close()
else:
print 'Skipping restoration of %r (%r).' % (file_path, storage_id)
def usage(code, msg=''):
outfp = sys.stderr
if code == 0:
outfp = sys.stdout
print >> outfp, __doc__ % globals()
if msg:
print >> outfp, msg
sys.exit(code)
def parseargs():
try:
opts, args = getopt.getopt(sys.argv[1:], 'hc:',
['help', 'config='])
except getopt.error, msg:
usage(1, msg)
class Options:
configuration_file_name = None
status_file = None
options = Options()
for opt, arg in opts:
if opt in ('-h', '--help'):
usage(0)
elif opt in ('-c', '--config'):
options.configuration_file_name = arg
if options.configuration_file_name is None:
usage(1, 'Either -c or --config is required.')
configuration_filename, ext = os.path.splitext(os.path.basename(options.configuration_file_name))
configuration_path = os.path.dirname(options.configuration_file_name)
if len(configuration_path):
configuration_path = [configuration_path]
else:
configuration_path = sys.path
file, path, description = imp.find_module(configuration_filename, configuration_path)
module = imp.load_module(configuration_filename, file, path, description)
file.close()
try:
options.data_fs_backup_path_dict = module.data_fs_backup_path_dict
options.status_file = module.status_file
except AttributeError, msg:
usage(1, msg)
return options
options = parseargs()
recover(
data_fs_backup_path_dict=options.data_fs_backup_path_dict,
status_file=options.status_file)
# COMMON
# This part is used both by server_v2.py and repozo_tidstorage_v2.py
known_tid_storage_identifier_dict = {
"((('localhost', 8200),), '2')":
('/home/vincent/zeo2/var2/Data.fs',
'/home/vincent/tmp/repozo/z22',
'foo_test'),
"((('localhost', 8200),), '1')":
('/home/vincent/zeo2/var/Data.fs',
'/home/vincent/tmp/repozo/z21',
'bar_test'),
"((('localhost', 8100),), '1')":
('/home/vincent/zeo1/var/Data.fs',
'/home/vincent/tmp/repozo/z11',
'baz_test'),
}
base_url = 'http://localhost:5080/erp5/%s/modifyContext'
port = 9001
host = '127.0.0.1'
# SERVER
# This part is only used by server_v2.py
#logfile_name = 'tidstorage.log'
#pidfile_name = 'tidstorage.pid'
#fork = False
#setuid = None
#setgid = None
status_file = 'tidstorage.tid'
burst_period = 30
full_dump_period = 300
# REPOZO_TIDSTORAGE
# This part is only used by repozo_tidstorage_v2.py
timestamp_file_path = 'repozo_tidstorage_timestamp.log'
#!/usr/bin/python
from ExchangeProtocol import ExchangeProtocol
import traceback
import socket
import sys
assert len(sys.argv) == 3, 'Requires exactly 2 arguments: <address> <port>'
address = sys.argv[1]
port = int(sys.argv[2])
class TIDClient:
def __init__(self, address):
self._to_server = socket.socket()
self._to_server.connect(address)
self._exchange_protocol = ExchangeProtocol(self._to_server)
def _dump(self, test_id=None):
self._exchange_protocol.send_field('dump')
received_dict = self._exchange_protocol.recv_dict()
if test_id is None:
result = received_dict
else:
id_len = len(test_id) + 1 # Add 1 to strip underscore.
result = dict([(key[id_len:], value) \
for key, value in received_dict.iteritems() \
if key.startswith(test_id)])
return dict([(key, int(value)) for key, value in result.iteritems()])
def dump(self, test_id):
return self._dump(test_id=test_id)
def dump_all(self):
return self._dump()
def begin(self, test_id, transaction_id, storage_id_list):
self._exchange_protocol.send_field('begin')
self._exchange_protocol.send_field(transaction_id)
internal_storage_id_list = ['%s_%s' % (test_id, x) \
for x in storage_id_list]
self._exchange_protocol.send_list(internal_storage_id_list)
def abort(self, test_id, transaction_id):
self._exchange_protocol.send_field('abort')
self._exchange_protocol.send_field(transaction_id)
def commit(self, test_id, transaction_id, storage_tid_dict):
self._exchange_protocol.send_field('commit')
self._exchange_protocol.send_field(transaction_id)
internal_storage_tid_dict = {}
for key, value in storage_tid_dict.iteritems():
internal_storage_tid_dict['%s_%s' % (test_id, key)] = value
self._exchange_protocol.send_dict(internal_storage_tid_dict)
class TestTIDServerV2:
def __init__(self, address, port):
self._client = TIDClient((address, port))
def assertEqual(self, value, target):
assert value == target, 'value %r does not match target %r' % (value, target)
def testInitialValue(self, test_id):
"""
Check that the storage is empty
"""
self.assertEqual(self._client.dump_all(), {})
def testScenario1(self, test_id):
"""
Simple begin - commit case.
"""
storage_tid_dict = {'s1': 1}
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't1', storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't1', storage_tid_dict)
self.assertEqual(self._client.dump(test_id), storage_tid_dict)
def testScenario2(self, test_id):
"""
Simple begin - abort case.
"""
storage_tid_dict = {'s1': 1}
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't1', storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.abort(test_id, 't1')
self.assertEqual(self._client.dump(test_id), {})
def testScenario3(self, test_id):
"""
2 concurent transactions impacting a common storage.
Second transaction begins after first does, and commits before
first does.
"""
t1_storage_tid_dict = {'s1': 1, 's2': 1}
t2_storage_tid_dict = {'s1': 2, 's3': 1}
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't2', t2_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't1', t1_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 1, 's3': 1})
def testScenario4(self, test_id):
"""
3 concurent transactions.
Transactions 1 and 2 impact same storage s1.
Transaction 3 impacts storage s3 after transaction 2 commited.
Still, as storage 3 was part of a non-commitable-yet transaction,
it must not be commited untill all blockable (here, t1) transaction have
ended.
"""
t1_storage_tid_dict = {'s1': 1, 's2': 2}
t2_storage_tid_dict = {'s1': 2, 's3': 1}
t3_storage_tid_dict = {'s3': 1}
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't2', t2_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't3', t3_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't3', t3_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't1', t1_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 2, 's3': 1})
def testScenario4bis(self, test_id):
"""
3 concurent transactions.
Transactions 1 and 2 impact same storage s1.
Transaction 3 impacts storage s3 after transaction 2 commited.
Still, as storage 3 was part of a non-commitable-yet transaction,
it must not be commited untill all blockable (here, t1) transaction have
ended.
In this version, t1 aborts: for example, tpc_vote failed. As the data
was already sent to storage, it might be already present on disk (and
anyway, tid is not to be used anymore), so it's valid for t2 to commit
with tid 2 even if t1 aborted tid 1.
"""
t1_storage_tid_dict = {'s1': 1, 's2': 2}
t2_storage_tid_dict = {'s1': 2, 's3': 1}
t3_storage_tid_dict = {'s3': 1}
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't2', t2_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't3', t3_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't3', t3_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {})
self._client.abort(test_id, 't1')
self.assertEqual(self._client.dump(test_id), {'s1': 2, 's3': 1})
def testScenario5(self, test_id):
"""
2 concurent transactions impacting a common storage.
Second transaction begins after first does, and commits after
first does.
"""
t1_storage_tid_dict = {'s1': 2}
t2_storage_tid_dict = {'s1': 1, 's2': 1}
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't1', t1_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't2', t2_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 1})
def testScenario6(self, test_id):
"""
2 concurent transactions impacting separate sets of storages.
Check that the first commit impacts dump data immediately.
"""
t1_storage_tid_dict = {'s1': 1}
t2_storage_tid_dict = {'s2': 1}
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't1', t1_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {'s1': 1})
self._client.commit(test_id, 't2', t2_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {'s1': 1, 's2': 1})
def testScenario7(self, test_id):
"""
3 concurent transactions.
t1 and t2 impact a set of different storages.
t3 impacts a set of storage containing the ones from t1 and the ones
from t2.
Check that nothing impacts dump data until everything is commited.
"""
t1_storage_tid_dict = {'s1': 1}
t2_storage_tid_dict = {'s2': 2}
t3_storage_tid_dict = {'s1': 2, 's2': 2}
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.begin(test_id, 't3', t3_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't1', t1_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't2', t2_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {})
self._client.commit(test_id, 't3', t3_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 2})
def testScenario8(self, test_id):
"""
Simple increase case.
"""
self.assertEqual(self._client.dump(test_id), {})
t1_storage_tid_dict = {}
for s1_value in (1, 2):
previous_t1_storage_tid_dict = t1_storage_tid_dict
t1_storage_tid_dict = {'s1': s1_value}
self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
self.assertEqual(self._client.dump(test_id), previous_t1_storage_tid_dict)
self._client.commit(test_id, 't1', t1_storage_tid_dict)
self.assertEqual(self._client.dump(test_id), t1_storage_tid_dict)
def run(self):
for test_method_id in [x for x in dir(self) if x.startswith('test')]:
self.log("Runing %s..." % (test_method_id, ))
try:
try:
getattr(self, test_method_id)(test_id=test_method_id)
except AssertionError:
self.log('F\n')
self.log('\n'.join(traceback.format_exception(*sys.exc_info())))
finally:
self.log('\n')
def log(self, message):
sys.stdout.write(message)
test = TestTIDServerV2(address, port)
test.run()
############################################################################
#
# Copyright (c) 2007, 2008 Nexedi SARL and Contributors. All Rights Reserved.
# Vincent Pelletier <vincent@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from ExchangeProtocol import ExchangeProtocol
from transaction._transaction import Transaction
from zLOG import LOG, WARNING
import socket
import thread
import struct
import sys
GET_LAST_COMMITED_TID_METHOD_ID = 'getLastCommitedTID'
TID_STORAGE_ADDRESS = ('127.0.0.1', 9001)
tid_storage = None
zope_identifier = None
# Borrowed from CMFActivity.ActivityTool.getCurrentNode
def getZopeId():
""" Return current node in form ip:port """
global zope_identifier
if zope_identifier is None:
port = ''
from asyncore import socket_map
for k, v in socket_map.items():
if hasattr(v, 'port'):
# see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
type = str(getattr(v, '__class__', 'unknown'))
if type == 'ZServer.HTTPServer.zhttp_server':
port = v.port
break
assert port != '', 'zhttp_server not started yet'
ip = socket.gethostbyname(socket.gethostname())
if TID_STORAGE_ADDRESS[0] != '127.0.0.1':
assert ip != '127.0.0.1', 'self address must not be 127.0.0.1 if TIDStorage is remote'
zope_identifier = '%s:%s' %(ip, port)
return zope_identifier
def getFilestorageList(resource_list):
return getFilestorageToTIDMapping(resource_list).keys()
def getFilestorageToTIDMapping(resource_list):
datafs_tid_update_dict = {}
for resource in resource_list:
storage = getattr(resource, '_storage', None)
if storage is not None:
getLastCommitedTID = getattr(storage, GET_LAST_COMMITED_TID_METHOD_ID,
None)
if getLastCommitedTID is not None:
tid = getLastCommitedTID()
_addr = tuple([tuple(x) for x in getattr(storage, '_addr', [])])
_storage = getattr(storage, '_storage', '')
datafs_id = repr((_addr, _storage))
assert datafs_id not in datafs_tid_update_dict
if tid is None:
datafs_tid_update_dict[datafs_id] = None
else:
# unpack stolen from ZODB/utils.py:u64
datafs_tid_update_dict[datafs_id] = struct.unpack(">Q", tid)[0]
return datafs_tid_update_dict
class BufferedSocket:
"""
Write-only thread-safe buffered socket.
Attemps to reconnect at most once per flush.
"""
_socket_lock = thread.allocate_lock()
_connected = False
def __init__(self, address):
self._socket = socket.socket()
self._address = address
self._send_buffer_dict = {}
def _connect(self):
try:
self._socket.connect(self._address)
self._notifyConnected()
except socket.error, message:
# We don't want to have an error line per failed connection attemp, to
# avoid flooding the logfile.
pass
def _getSendBuffer(self, ident):
send_buffer = self._send_buffer_dict.get(ident)
if send_buffer is None:
send_buffer = self._send_buffer_dict[ident] = []
return send_buffer
def _notifyDisconnected(self, message):
if self._connected:
self._connected = False
LOG('TIDStorage', WARNING, 'Disconnected: %s' % (message, ))
def _notifyConnected(self):
if not self._connected:
self._connected = True
# Display a log message at WARNING level, so that reconnection message
# are visible when disconnection messages are visible, even if it is
# not a warning, properly speaking.
LOG('TIDStorage', WARNING, 'Connected')
def send(self, to_send):
send_buffer = self._getSendBuffer(thread.get_ident())
send_buffer.append(to_send)
def flush(self):
"""
Flush send buffer and actually send data, with extra checks to behave
nicely if connection is broken.
Do not retry to send if something goes wrong (data is then lost !).
Here, most important thing is speed, not data.
Serialize usage.
"""
ident = thread.get_ident()
self._socket_lock.acquire()
try:
if not self._connected:
self._connect()
if self._connected:
try:
self._socket.sendall(''.join(self._getSendBuffer(ident)))
except socket.error, message:
self._notifyDisconnected(message)
try:
self._socket.shutdown(socket.SHUT_RDWR)
except socket.error:
self._socket.close()
self._socket = socket.socket()
finally:
self._socket_lock.release()
self._send_buffer_dict[ident] = []
class TIDClient:
def __init__(self, address):
self._buffered_socket = BufferedSocket(address)
self._field_exchange = ExchangeProtocol(socket=self._buffered_socket)
def commit(self, tid_update_dict):
"""
Send given dict to TIDStorage server.
"""
self._send_command('commit')
self._field_exchange.send_dict(tid_update_dict)
self._buffered_socket.flush()
def begin(self, storage_id_list):
"""
Inform TIDStorage connection tracking that commit was initiated.
"""
self._send_command('begin')
self._field_exchange.send_list(storage_id_list)
self._buffered_socket.flush()
def abort(self):
"""
Inform TIDStorage connection tracking that commit was aborted.
"""
self._send_command('abort')
self._buffered_socket.flush()
def _send_command(self, command):
"""
Every command must be followed by an identifier.
This identifier is used to track transactions, so the same identifier
must not be used twice at the same time, but can be reused later.
"""
self._field_exchange.send_field(command)
self._field_exchange.send_field('%s_%x' % (getZopeId(), thread.get_ident()))
original__commitResources = Transaction._commitResources
def _commitResources(self, *args, **kw):
"""
Hook Transaction's _commitResources.
Before:
- Initialise TIDClient if needed
- Check if there is any storage we are interested in in current commit
- If so, issue a begin
After (2 cases):
- original__commitResources raised:
- Issue an abort
- otherwise:
- Issue a commit
Note to editors: Prevent your code from raising anything ! This method
MUST NOT raise any exception, except that it MUST NOT hide any exception
raised by original__commitResources.
"""
has_storages = False
try:
global tid_storage
if tid_storage is None:
tid_storage = TIDClient(TID_STORAGE_ADDRESS)
filestorage_list = getFilestorageList(self._resources)
if len(filestorage_list):
has_storages = True
tid_storage.begin(filestorage_list)
except:
LOG('TIDStorage _commitResources', WARNING, 'Exception in begin phase', error=sys.exc_info())
try:
result = original__commitResources(self, *args, **kw)
except:
if has_storages:
exception = sys.exc_info()
try:
tid_storage.abort()
except:
LOG('TIDStorage _commitResources', WARNING, 'Exception in abort phase', error=sys.exc_info())
# Re-raise original exception, in case sendTIDCommitAbort tainted
# last exception value.
raise exception[0], exception[1], exception[2]
else:
raise
else:
if has_storages:
# Now that everything has been commited, all exceptions relative to added
# code must be swalowed (but still reported) to avoid confusing transaction
# system.
try:
tid_storage.commit(getFilestorageToTIDMapping(self._resources))
except:
LOG('TIDStorage _commitResources', WARNING, 'Exception in commit phase', error=sys.exc_info())
return result
Transaction._commitResources = _commitResources
#!/usr/bin/python
##############################################################################
#
# Copyright (c) 2008 Nexedi SARL and Contributors. All Rights Reserved.
# Vincent Pelletier <vincent@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
# Checks the sanity of a tidstorage TID log provided via stdin.
# Exit status:
# 0 Success
# 1 Failure
# Error is displayed on stderr.
# On success, final tid values for each storage is displayed on stdout.
import sys
content = {}
last_timestamp = None
line = sys.stdin.readline()
while line != '':
split_line = line.split(' ', 2)
assert len(split_line) == 3, repr(split_line)
line_timestamp, line_type, line_dict = split_line
line_timestamp = float(line_timestamp)
assert line_type in ('f', 'd'), repr(line_type)
if last_timestamp is None:
last_timestamp = line_timestamp
else:
assert last_timestamp < line_timestamp, '%r < %r' % (last_timestamp, line_timestamp)
line_dict = eval(line_dict, None)
assert isinstance(line_dict, dict), type(line_dict)
assert len(line_dict), repr(line_dict)
if line_type == 'd':
for key, value in line_dict.iteritems():
if key in content:
assert content[key] < value, '%r < %r' % (content[key], value)
content[key] = value
elif line_type == 'f':
for key, value in content.iteritems():
assert key in line_dict, repr(key)
assert value <= line_dict[key], '%r <= %r' % (value, line_dict[key])
content = line_dict
line = sys.stdin.readline()
key_list = content.keys()
key_list.sort()
for key in key_list:
print '%r %r' % (key, content[key])
#!/usr/bin/python
##############################################################################
#
# Copyright (c) 2008 Nexedi SARL and Contributors. All Rights Reserved.
# Vincent Pelletier <vincent@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
# Transforms a TIDStorage TID log provided on stdin, which might contain
# full statuses and/or incremental changes, and provides a version on stdout
# containing only full statuses.
# Also, does sanity checks on the given file.
# Exit status:
# 0 Success
# 1 Failure
import sys
content = {}
last_timestamp = None
line = sys.stdin.readline()
while line != '':
split_line = line.split(' ', 2)
assert len(split_line) == 3, repr(split_line)
line_timestamp, line_type, line_dict = split_line
line_timestamp = float(line_timestamp)
assert line_type in ('f', 'd'), repr(line_type)
if last_timestamp is None:
last_timestamp = line_timestamp
else:
assert last_timestamp < line_timestamp, '%r < %r' % (last_timestamp, line_timestamp)
line_dict = eval(line_dict, None)
assert isinstance(line_dict, dict), type(line_dict)
assert len(line_dict), repr(line_dict)
if line_type == 'd':
for key, value in line_dict.iteritems():
if key in content:
assert content[key] < value, '%r < %r' % (content[key], value)
content[key] = value
print '%r f %r' % (line_timestamp, content)
elif line_type == 'f':
for key, value in content.iteritems():
assert key in line_dict, repr(key)
assert value <= line_dict[key], '%r <= %r' % (value, line_dict[key])
content = line_dict
print line.strip()
line = sys.stdin.readline()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment