Commit fd80cc30 authored by Julien Muchembled's avatar Julien Muchembled

Add support for custom compression levels

parent 6f855eef
......@@ -15,7 +15,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from cPickle import dumps, loads
from zlib import compress, decompress
import heapq
import time
......@@ -26,6 +25,7 @@ if OLD_ZODB:
from persistent.TimeStamp import TimeStamp
from neo.lib import logging
from neo.lib.compress import decompress_list, getCompress
from neo.lib.protocol import NodeTypes, Packets, \
INVALID_PARTITION, MAX_TID, ZERO_HASH, ZERO_TID
from neo.lib.util import makeChecksum, dump
......@@ -50,7 +50,6 @@ if SignalHandler:
import signal
SignalHandler.registerHandler(signal.SIGUSR2, logging.reopen)
class Application(ThreadedApplication):
"""The client node application."""
......@@ -99,7 +98,7 @@ class Application(ThreadedApplication):
# _connecting_to_master_node is used to prevent simultaneous master
# node connection attempts
self._connecting_to_master_node = Lock()
self.compress = compress
self.compress = getCompress(compress)
def __getattr__(self, attr):
if attr in ('last_tid', 'pt'):
......@@ -387,7 +386,7 @@ class Application(ThreadedApplication):
logging.error('wrong checksum from %s for oid %s',
conn, dump(oid))
raise NEOStorageReadRetry(False)
return (decompress(data) if compression else data,
return (decompress_list[compression](data),
tid, next_tid, data_tid)
raise NEOStorageCreationUndoneError(dump(oid))
return self._askStorageForRead(oid,
......@@ -434,17 +433,7 @@ class Application(ThreadedApplication):
checksum = ZERO_HASH
else:
assert data_serial is None
size = len(data)
if self.compress:
compressed_data = compress(data)
if size <= len(compressed_data):
compressed_data = data
compression = 0
else:
compression = 1
else:
compression = 0
compressed_data = data
size, compression, compressed_data = self.compress(data)
checksum = makeChecksum(compressed_data)
txn_context.data_size += size
# Store object in tmp cache
......
......@@ -14,10 +14,14 @@
Give the name of the cluster
</description>
</key>
<key name="compress" datatype="boolean">
<key name="compress" datatype=".compress">
<description>
If true, data is automatically compressed (unless compressed size is
not smaller). This is the default behaviour.
The value is either of 'boolean' type or an explicit algorithm that
matches the regex 'zlib(=\d+)?', where the optional number is
the compression level.
Any record that is not smaller once compressed is stored uncompressed.
True is the default and its meaning may change over time:
currently, it is the same as 'zlib'.
</description>
</key>
<key name="read-only" datatype="boolean">
......
......@@ -23,3 +23,11 @@ class NeoStorage(BaseConfig):
config = self.config
return Storage(**{k: getattr(config, k)
for k in config.getSectionAttributes()})
def compress(value):
from ZConfig.datatypes import asBoolean
try:
return asBoolean(value)
except ValueError:
from neo.lib.compress import parseOption
return parseOption(value)
......@@ -14,10 +14,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from zlib import decompress
from ZODB.TimeStamp import TimeStamp
from neo.lib import logging
from neo.lib.compress import decompress_list
from neo.lib.protocol import Packets, uuid_str
from neo.lib.util import dump, makeChecksum
from neo.lib.exception import NodeNotReady
......@@ -129,8 +129,7 @@ class StorageAnswersHandler(AnswerBaseHandler):
'wrong checksum while getting back data for'
' object %s during rebase of transaction %s'
% (dump(oid), dump(txn_context.ttid)))
if compression:
data = decompress(data)
data = decompress_list[compression](data)
size = len(data)
txn_context.data_size += size
if cached:
......
#
# Copyright (C) 2018 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from functools import partial
import zlib
decompress_list = (
lambda data: data,
zlib.decompress,
)
def parseOption(value):
x = value.split('=', 1)
try:
alg = ('zlib',).index(x[0])
if len(x) == 1:
return alg, None
level = int(x[1])
except Exception:
raise ValueError("not a valid 'compress' option: %r" % value)
if 0 < level <= zlib.Z_BEST_COMPRESSION:
return alg, level
raise ValueError("invalid compression level: %r" % level)
def getCompress(value):
if value:
alg, level = (0, None) if value is True else value
_compress = zlib.compress
if level:
_compress = partial(_compress, level=level)
alg += 1
assert 0 < alg < len(decompress_list), 'invalid compression algorithm'
def compress(data):
size = len(data)
compressed = _compress(data)
if len(compressed) < size:
return size, alg, compressed
return size, 0, data
compress._compress = _compress # for testBasicStore
return compress
return lambda data: (len(data), 0, data)
......@@ -65,6 +65,7 @@ UNIT_TEST_MODULES = [
'neo.tests.client.testZODBURI',
# light functional tests
'neo.tests.threaded.test',
'neo.tests.threaded.testConfig',
'neo.tests.threaded.testImporter',
'neo.tests.threaded.testReplication',
'neo.tests.threaded.testSSL',
......
......@@ -25,7 +25,7 @@ from ZODB.POSException import POSKeyError
from . import buildDatabaseManager
from .manager import DatabaseManager
from neo.lib import logging, patch, util
from neo.lib import compress, logging, patch, util
from neo.lib.exception import DatabaseFailure
from neo.lib.interfaces import implements
from neo.lib.protocol import BackendNotImplemented, MAX_TID
......@@ -297,7 +297,11 @@ class ImporterDatabaseManager(DatabaseManager):
main = {'adapter': 'MySQL', 'wait': 0}
main.update(config.items(sections.pop(0)))
self.zodb = ((x, dict(config.items(x))) for x in sections)
self.compress = main.get('compress', 1)
x = main.get('compress')
try:
self.compress = bool(x and ('false', 'true').index(x))
except ValueError:
self.compress = compress.parseOption(x)
self.db = buildDatabaseManager(main['adapter'],
(main['database'], main.get('engine'), main['wait']))
for x in """getConfiguration _setConfiguration setNumPartitions
......@@ -375,11 +379,7 @@ class ImporterDatabaseManager(DatabaseManager):
if self._last_commit + 1 < time.time():
self.commit()
self.zodb_tid = u64(tid)
if self.compress:
from zlib import compress
else:
compress = None
compression = 0
_compress = compress.getCompress(self.compress)
object_list = []
data_id_list = []
while zodb_list:
......@@ -399,12 +399,7 @@ class ImporterDatabaseManager(DatabaseManager):
if data_tid or r.data is None:
data_id = None
else:
data = zodb.repickle(r.data)
if compress:
compressed_data = compress(data)
compression = len(compressed_data) < len(data)
if compression:
data = compressed_data
_, compression, data = _compress(zodb.repickle(r.data))
data_id = self.holdData(util.makeChecksum(data), oid, data,
compression)
data_id_list.append(data_id)
......
......@@ -23,7 +23,6 @@ import unittest
from collections import defaultdict
from contextlib import contextmanager
from thread import get_ident
from zlib import compress
from persistent import Persistent, GHOST
from transaction.interfaces import TransientError
from ZODB import DB, POSException
......@@ -66,6 +65,7 @@ class Test(NEOThreadedTest):
storage = cluster.getZODBStorage()
storage.sync()
storage.app.max_reconnection_to_master = 0
compress = storage.app.compress._compress
data_info = {}
compressible = 'x' * 20
compressed = compress(compressible)
......
#
# Copyright (C) 2018 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from contextlib import contextmanager
from ZConfig import ConfigurationSyntaxError
from ZODB.config import databaseFromString
from .. import Patch
from . import ClientApplication, NEOThreadedTest, with_cluster
from neo.client import Storage
def databaseFromDict(**kw):
return databaseFromString("%%import neo.client\n"
"<zodb>\n <NEOStorage>\n%s </NEOStorage>\n</zodb>\n"
% ''.join(' %s %s\n' % x for x in kw.iteritems()))
class ConfigTests(NEOThreadedTest):
dummy_required = {'name': 'cluster', 'master_nodes': '127.0.0.1:10000'}
@contextmanager
def _db(self, cluster, **kw):
kw['name'] = cluster.name
kw['master_nodes'] = cluster.master_nodes
def newClient(_, *args, **kw):
client = ClientApplication(*args, **kw)
t.append(client.poll_thread)
return client
t = []
with Patch(Storage, Application=newClient):
db = databaseFromDict(**kw)
try:
yield db
finally:
db.close()
cluster.join(t)
@with_cluster()
def testCompress(self, cluster):
kw = self.dummy_required.copy()
valid = ['false', 'true', 'zlib', 'zlib=9']
for kw['compress'] in '9', 'best', 'zlib=0', 'zlib=100':
self.assertRaises(ConfigurationSyntaxError, databaseFromDict, **kw)
for compress in valid:
with self._db(cluster, compress=compress):
pass
if __name__ == "__main__":
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment