Commit 6ba5f606 authored by dieter's avatar dieter

add `fsdump/fsstats` test

parent 403f9869
......@@ -181,6 +181,8 @@ def main(path=None):
objects = 0
txn_bytes.add(size)
if objects:
txn_objects.add(objects)
f.close()
print("Summary: %d txns, %d objects, %d revisions" % (
......
##############################################################################
#
# Copyright (c) 2021 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from ZODB import DB
from ZODB.scripts.fsstats import rx_data
from ZODB.scripts.fsstats import rx_txn
from ZODB.tests.util import TestCase
from ZODB.tests.util import run_module_as_script
class FsdumpFsstatsTests(TestCase):
def setUp(self):
super(FsdumpFsstatsTests, self).setUp()
# create (empty) storage ``data.fs``
DB("data.fs").close()
def test_fsdump(self):
run_module_as_script("ZODB.FileStorage.fsdump", ["data.fs"])
# verify that ``fsstats`` will understand the output
with open("stdout") as f:
tno = obno = 0
for li in f:
if li.startswith(" data"):
m = rx_data.search(li)
if m is None:
continue
oid, size, klass = m.groups()
int(size)
obno += 1
elif li.startswith("Trans"):
m = rx_txn.search(li)
if not m:
continue
tid, size = m.groups()
size = int(size)
tno += 1
self.assertEqual(tno, 1)
self.assertEqual(obno, 1)
def test_fsstats(self):
# The ``fsstats`` output is complex
# currently, we just check the first (summary) line
run_module_as_script("ZODB.FileStorage.fsdump", ["data.fs"],
"data.dmp")
run_module_as_script("ZODB.scripts.fsstats", ["data.dmp"])
with open("stdout") as f:
self.assertEqual(f.readline().strip(),
"Summary: 1 txns, 1 objects, 1 revisions")
......@@ -16,9 +16,12 @@
from ZODB.MappingStorage import DB
import atexit
import doctest
import os
import pdb
import persistent
import re
import runpy
import sys
import tempfile
import time
......@@ -377,3 +380,28 @@ def with_high_concurrency(f):
restore()
return _
def run_module_as_script(mod, args, stdout="stdout", stderr="stderr"):
"""run module *mod* as script with arguments *arg*.
stdout and stderr are redirected to files given by the
correcponding parameters.
The function is usually called in a ``setUp/tearDown`` frame
which will remove the created files.
"""
sargv, sout, serr = sys.argv, sys.stdout, sys.stderr
s_set_trace = pdb.set_trace
try:
sys.argv = [sargv[0]] + args
sys.stdout = open(stdout, "w")
sys.stderr = open(stderr, "w")
# to allow debugging
pdb.set_trace = doctest._OutputRedirectingPdb(sout)
runpy.run_module(mod, run_name="__main__", alter_sys=True)
finally:
sys.stdout.close()
sys.stderr.close()
pdb.set_trace = s_set_trace
sys.argv, sys.stdout, sys.stderr = sargv, sout, serr
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment