Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Z
ZODB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Kirill Smelkov
ZODB
Commits
7ee7c920
Commit
7ee7c920
authored
May 14, 2010
by
Jim Fulton
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Merged the tseaver-better_repozo_tests branch.
parent
c36d4c48
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
599 additions
and
95 deletions
+599
-95
src/ZODB/scripts/repozo.py
src/ZODB/scripts/repozo.py
+13
-6
src/ZODB/scripts/tests/test_repozo.py
src/ZODB/scripts/tests/test_repozo.py
+586
-89
No files found.
src/ZODB/scripts/repozo.py
View file @
7ee7c920
...
...
@@ -93,6 +93,9 @@ READCHUNK = 16 * 1024
VERBOSE
=
False
class
WouldOverwriteFiles
(
Exception
):
pass
def
usage
(
code
,
msg
=
''
):
outfp
=
sys
.
stderr
if
code
==
0
:
...
...
@@ -301,7 +304,9 @@ def gen_filename(options, ext=None):
ext
=
'.deltafs'
if
options
.
gzip
:
ext
+=
'z'
t
=
time
.
gmtime
()[:
6
]
+
(
ext
,)
# Hook for testing
now
=
getattr
(
options
,
'test_now'
,
time
.
gmtime
()[:
6
])
t
=
now
+
(
ext
,)
return
'%04d-%02d-%02d-%02d-%02d-%02d%s'
%
t
# Return a list of files needed to reproduce state at time options.date.
...
...
@@ -419,8 +424,7 @@ def do_full_backup(options):
options
.
full
=
True
dest
=
os
.
path
.
join
(
options
.
repository
,
gen_filename
(
options
))
if
os
.
path
.
exists
(
dest
):
print
>>
sys
.
stderr
,
'Cannot overwrite existing file:'
,
dest
sys
.
exit
(
2
)
raise
WouldOverwriteFiles
(
'Cannot overwrite existing file: %s'
%
dest
)
log
(
'writing full backup: %s bytes to %s'
,
pos
,
dest
)
sum
=
copyfile
(
options
,
dest
,
0
,
pos
)
# Write the data file for this full backup
...
...
@@ -447,8 +451,7 @@ def do_incremental_backup(options, reposz, repofiles):
options
.
full
=
False
dest
=
os
.
path
.
join
(
options
.
repository
,
gen_filename
(
options
))
if
os
.
path
.
exists
(
dest
):
print
>>
sys
.
stderr
,
'Cannot overwrite existing file:'
,
dest
sys
.
exit
(
2
)
raise
WouldOverwriteFiles
(
'Cannot overwrite existing file: %s'
%
dest
)
log
(
'writing incremental: %s bytes to %s'
,
pos
-
reposz
,
dest
)
sum
=
copyfile
(
options
,
dest
,
reposz
,
pos
-
reposz
)
# The first file in repofiles points to the last full backup. Use this to
...
...
@@ -570,7 +573,11 @@ def main(argv=None):
argv
=
sys
.
argv
[
1
:]
options
=
parseargs
(
argv
)
if
options
.
mode
==
BACKUP
:
do_backup
(
options
)
try
:
do_backup
(
options
)
except
WouldOverwriteFiles
,
e
:
print
>>
sys
.
stderr
,
str
(
e
)
sys
.
exit
(
2
)
else
:
assert
options
.
mode
==
RECOVER
do_recover
(
options
)
...
...
src/ZODB/scripts/tests/test_repozo.py
View file @
7ee7c920
...
...
@@ -13,12 +13,21 @@
##############################################################################
import
unittest
import
os
try
:
# the hashlib package is available from Python 2.5
from
hashlib
import
md5
except
ImportError
:
# the md5 package is deprecated in Python 2.6
from
md5
import
new
as
md5
import
ZODB.tests.util
# layer used at class scope
_NOISY
=
os
.
environ
.
get
(
'NOISY_REPOZO_TEST_OUTPUT'
)
class
OurDB
:
_file_name
=
None
def
__init__
(
self
,
dir
):
from
BTrees.OOBTree
import
OOBTree
import
transaction
...
...
@@ -27,12 +36,13 @@ class OurDB:
conn
=
self
.
db
.
open
()
conn
.
root
()[
'tree'
]
=
OOBTree
()
transaction
.
commit
()
self
.
pos
=
self
.
db
.
storage
.
_pos
self
.
close
()
def
getdb
(
self
):
from
ZODB
import
DB
from
ZODB.FileStorage
import
FileStorage
storage_filename
=
os
.
path
.
join
(
self
.
dir
,
'Data.fs'
)
s
elf
.
_file_name
=
s
torage_filename
=
os
.
path
.
join
(
self
.
dir
,
'Data.fs'
)
storage
=
FileStorage
(
storage_filename
)
self
.
db
=
DB
(
storage
)
...
...
@@ -63,10 +73,573 @@ class OurDB:
if
keys
:
del
tree
[
keys
[
0
]]
transaction
.
commit
()
self
.
pos
=
self
.
db
.
storage
.
_pos
self
.
close
()
class
RepozoTests
(
unittest
.
TestCase
):
class
FileopsBase
:
def
_makeChunks
(
self
):
from
ZODB.scripts.repozo
import
READCHUNK
return
[
'x'
*
READCHUNK
,
'y'
*
READCHUNK
,
'z'
]
def
_makeFile
(
self
,
text
=
None
):
from
StringIO
import
StringIO
if
text
is
None
:
text
=
''
.
join
(
self
.
_makeChunks
())
return
StringIO
(
text
)
class
Test_dofile
(
unittest
.
TestCase
,
FileopsBase
):
def
_callFUT
(
self
,
func
,
fp
,
n
):
from
ZODB.scripts.repozo
import
dofile
return
dofile
(
func
,
fp
,
n
)
def
test_empty_read_all
(
self
):
chunks
=
[]
file
=
self
.
_makeFile
(
''
)
bytes
=
self
.
_callFUT
(
chunks
.
append
,
file
,
None
)
self
.
assertEqual
(
bytes
,
0
)
self
.
assertEqual
(
chunks
,
[])
def
test_empty_read_count
(
self
):
chunks
=
[]
file
=
self
.
_makeFile
(
''
)
bytes
=
self
.
_callFUT
(
chunks
.
append
,
file
,
42
)
self
.
assertEqual
(
bytes
,
0
)
self
.
assertEqual
(
chunks
,
[])
def
test_nonempty_read_all
(
self
):
chunks
=
[]
file
=
self
.
_makeFile
()
bytes
=
self
.
_callFUT
(
chunks
.
append
,
file
,
None
)
self
.
assertEqual
(
bytes
,
file
.
tell
())
self
.
assertEqual
(
chunks
,
self
.
_makeChunks
())
def
test_nonempty_read_count
(
self
):
chunks
=
[]
file
=
self
.
_makeFile
()
bytes
=
self
.
_callFUT
(
chunks
.
append
,
file
,
42
)
self
.
assertEqual
(
bytes
,
42
)
self
.
assertEqual
(
chunks
,
[
'x'
*
42
])
class
Test_checksum
(
unittest
.
TestCase
,
FileopsBase
):
def
_callFUT
(
self
,
fp
,
n
):
from
ZODB.scripts.repozo
import
checksum
return
checksum
(
fp
,
n
)
def
test_empty_read_all
(
self
):
file
=
self
.
_makeFile
(
''
)
sum
=
self
.
_callFUT
(
file
,
None
)
self
.
assertEqual
(
sum
,
md5
(
''
).
hexdigest
())
def
test_empty_read_count
(
self
):
file
=
self
.
_makeFile
(
''
)
sum
=
self
.
_callFUT
(
file
,
42
)
self
.
assertEqual
(
sum
,
md5
(
''
).
hexdigest
())
def
test_nonempty_read_all
(
self
):
file
=
self
.
_makeFile
()
sum
=
self
.
_callFUT
(
file
,
None
)
self
.
assertEqual
(
sum
,
md5
(
''
.
join
(
self
.
_makeChunks
())).
hexdigest
())
def
test_nonempty_read_count
(
self
):
chunks
=
[]
file
=
self
.
_makeFile
()
sum
=
self
.
_callFUT
(
file
,
42
)
self
.
assertEqual
(
sum
,
md5
(
'x'
*
42
).
hexdigest
())
class
OptionsTestBase
:
_repository_directory
=
None
_data_directory
=
None
def
tearDown
(
self
):
if
self
.
_repository_directory
is
not
None
:
from
shutil
import
rmtree
rmtree
(
self
.
_repository_directory
)
if
self
.
_data_directory
is
not
None
:
from
shutil
import
rmtree
rmtree
(
self
.
_data_directory
)
def
_makeOptions
(
self
,
**
kw
):
import
tempfile
self
.
_repository_directory
=
tempfile
.
mkdtemp
()
class
Options
(
object
):
repository
=
self
.
_repository_directory
def
__init__
(
self
,
**
kw
):
self
.
__dict__
.
update
(
kw
)
return
Options
(
**
kw
)
class
Test_copyfile
(
OptionsTestBase
,
unittest
.
TestCase
):
def
_callFUT
(
self
,
options
,
dest
,
start
,
n
):
from
ZODB.scripts.repozo
import
copyfile
return
copyfile
(
options
,
dest
,
start
,
n
)
def
test_no_gzip
(
self
):
options
=
self
.
_makeOptions
(
gzip
=
False
)
source
=
options
.
file
=
os
.
path
.
join
(
self
.
_repository_directory
,
'source.txt'
)
f
=
open
(
source
,
'wb'
)
f
.
write
(
'x'
*
1000
)
f
.
close
()
target
=
os
.
path
.
join
(
self
.
_repository_directory
,
'target.txt'
)
sum
=
self
.
_callFUT
(
options
,
target
,
0
,
100
)
self
.
assertEqual
(
sum
,
md5
(
'x'
*
100
).
hexdigest
())
self
.
assertEqual
(
open
(
target
,
'rb'
).
read
(),
'x'
*
100
)
def
test_w_gzip
(
self
):
import
gzip
options
=
self
.
_makeOptions
(
gzip
=
True
)
source
=
options
.
file
=
os
.
path
.
join
(
self
.
_repository_directory
,
'source.txt'
)
f
=
open
(
source
,
'wb'
)
f
.
write
(
'x'
*
1000
)
f
.
close
()
target
=
os
.
path
.
join
(
self
.
_repository_directory
,
'target.txt'
)
sum
=
self
.
_callFUT
(
options
,
target
,
0
,
100
)
self
.
assertEqual
(
sum
,
md5
(
'x'
*
100
).
hexdigest
())
self
.
assertEqual
(
gzip
.
open
(
target
,
'rb'
).
read
(),
'x'
*
100
)
class
Test_concat
(
OptionsTestBase
,
unittest
.
TestCase
):
def
_callFUT
(
self
,
files
,
ofp
):
from
ZODB.scripts.repozo
import
concat
return
concat
(
files
,
ofp
)
def
_makeFile
(
self
,
name
,
text
,
gzip_file
=
False
):
import
gzip
import
tempfile
if
self
.
_repository_directory
is
None
:
self
.
_repository_directory
=
tempfile
.
mkdtemp
()
fqn
=
os
.
path
.
join
(
self
.
_repository_directory
,
name
)
if
gzip_file
:
f
=
gzip
.
open
(
fqn
,
'wb'
)
else
:
f
=
open
(
fqn
,
'wb'
)
f
.
write
(
text
)
f
.
flush
()
f
.
close
()
return
fqn
def
test_empty_list_no_ofp
(
self
):
bytes
,
sum
=
self
.
_callFUT
([],
None
)
self
.
assertEqual
(
bytes
,
0
)
self
.
assertEqual
(
sum
,
md5
(
''
).
hexdigest
())
def
test_w_plain_files_no_ofp
(
self
):
files
=
[
self
.
_makeFile
(
x
,
x
,
False
)
for
x
in
'ABC'
]
bytes
,
sum
=
self
.
_callFUT
(
files
,
None
)
self
.
assertEqual
(
bytes
,
3
)
self
.
assertEqual
(
sum
,
md5
(
'ABC'
).
hexdigest
())
def
test_w_gzipped_files_no_ofp
(
self
):
files
=
[
self
.
_makeFile
(
'%s.fsz'
%
x
,
x
,
True
)
for
x
in
'ABC'
]
bytes
,
sum
=
self
.
_callFUT
(
files
,
None
)
self
.
assertEqual
(
bytes
,
3
)
self
.
assertEqual
(
sum
,
md5
(
'ABC'
).
hexdigest
())
def
test_w_ofp
(
self
):
class
Faux
:
_closed
=
False
def
__init__
(
self
):
self
.
_written
=
[]
def
write
(
self
,
data
):
self
.
_written
.
append
(
data
)
def
close
(
self
):
self
.
_closed
=
True
files
=
[
self
.
_makeFile
(
x
,
x
,
False
)
for
x
in
'ABC'
]
ofp
=
Faux
()
bytes
,
sum
=
self
.
_callFUT
(
files
,
ofp
)
self
.
assertEqual
(
ofp
.
_written
,
[
x
for
x
in
'ABC'
])
self
.
failUnless
(
ofp
.
_closed
)
_marker
=
object
()
class
Test_gen_filename
(
OptionsTestBase
,
unittest
.
TestCase
):
def
_callFUT
(
self
,
options
,
ext
=
_marker
):
from
ZODB.scripts.repozo
import
gen_filename
if
ext
is
_marker
:
return
gen_filename
(
options
)
return
gen_filename
(
options
,
ext
)
def
test_explicit_ext
(
self
):
options
=
self
.
_makeOptions
(
test_now
=
(
2010
,
5
,
14
,
12
,
52
,
31
))
fn
=
self
.
_callFUT
(
options
,
'.txt'
)
self
.
assertEqual
(
fn
,
'2010-05-14-12-52-31.txt'
)
def
test_full_no_gzip
(
self
):
options
=
self
.
_makeOptions
(
test_now
=
(
2010
,
5
,
14
,
12
,
52
,
31
),
full
=
True
,
gzip
=
False
,
)
fn
=
self
.
_callFUT
(
options
)
self
.
assertEqual
(
fn
,
'2010-05-14-12-52-31.fs'
)
def
test_full_w_gzip
(
self
):
options
=
self
.
_makeOptions
(
test_now
=
(
2010
,
5
,
14
,
12
,
52
,
31
),
full
=
True
,
gzip
=
True
,
)
fn
=
self
.
_callFUT
(
options
)
self
.
assertEqual
(
fn
,
'2010-05-14-12-52-31.fsz'
)
def
test_incr_no_gzip
(
self
):
options
=
self
.
_makeOptions
(
test_now
=
(
2010
,
5
,
14
,
12
,
52
,
31
),
full
=
False
,
gzip
=
False
,
)
fn
=
self
.
_callFUT
(
options
)
self
.
assertEqual
(
fn
,
'2010-05-14-12-52-31.deltafs'
)
def
test_incr_w_gzip
(
self
):
options
=
self
.
_makeOptions
(
test_now
=
(
2010
,
5
,
14
,
12
,
52
,
31
),
full
=
False
,
gzip
=
True
,
)
fn
=
self
.
_callFUT
(
options
)
self
.
assertEqual
(
fn
,
'2010-05-14-12-52-31.deltafsz'
)
class
Test_find_files
(
OptionsTestBase
,
unittest
.
TestCase
):
def
_callFUT
(
self
,
options
):
from
ZODB.scripts.repozo
import
find_files
return
find_files
(
options
)
def
_makeFile
(
self
,
hour
,
min
,
sec
,
ext
):
# call _makeOptions first!
name
=
'2010-05-14-%02d-%02d-%02d%s'
%
(
hour
,
min
,
sec
,
ext
)
fqn
=
os
.
path
.
join
(
self
.
_repository_directory
,
name
)
f
=
open
(
fqn
,
'wb'
)
f
.
write
(
name
)
f
.
flush
()
f
.
close
()
return
fqn
def
test_explicit_date
(
self
):
options
=
self
.
_makeOptions
(
date
=
'2010-05-14-13-30-57'
)
files
=
[]
for
h
,
m
,
s
,
e
in
[(
2
,
13
,
14
,
'.fs'
),
(
2
,
13
,
14
,
'.dat'
),
(
3
,
14
,
15
,
'.deltafs'
),
(
4
,
14
,
15
,
'.deltafs'
),
(
5
,
14
,
15
,
'.deltafs'
),
(
12
,
13
,
14
,
'.fs'
),
(
12
,
13
,
14
,
'.dat'
),
(
13
,
14
,
15
,
'.deltafs'
),
(
14
,
15
,
16
,
'.deltafs'
),
]:
files
.
append
(
self
.
_makeFile
(
h
,
m
,
s
,
e
))
found
=
self
.
_callFUT
(
options
)
# Older files, .dat file not included
self
.
assertEqual
(
found
,
[
files
[
5
],
files
[
7
]])
def
test_using_gen_filename
(
self
):
options
=
self
.
_makeOptions
(
date
=
None
,
test_now
=
(
2010
,
5
,
14
,
13
,
30
,
57
))
files
=
[]
for
h
,
m
,
s
,
e
in
[(
2
,
13
,
14
,
'.fs'
),
(
2
,
13
,
14
,
'.dat'
),
(
3
,
14
,
15
,
'.deltafs'
),
(
4
,
14
,
15
,
'.deltafs'
),
(
5
,
14
,
15
,
'.deltafs'
),
(
12
,
13
,
14
,
'.fs'
),
(
12
,
13
,
14
,
'.dat'
),
(
13
,
14
,
15
,
'.deltafs'
),
(
14
,
15
,
16
,
'.deltafs'
),
]:
files
.
append
(
self
.
_makeFile
(
h
,
m
,
s
,
e
))
found
=
self
.
_callFUT
(
options
)
# Older files, .dat file not included
self
.
assertEqual
(
found
,
[
files
[
5
],
files
[
7
]])
class
Test_scandat
(
OptionsTestBase
,
unittest
.
TestCase
):
def
_callFUT
(
self
,
repofiles
):
from
ZODB.scripts.repozo
import
scandat
return
scandat
(
repofiles
)
def
test_no_dat_file
(
self
):
options
=
self
.
_makeOptions
()
fsfile
=
os
.
path
.
join
(
self
.
_repository_directory
,
'foo.fs'
)
fn
,
startpos
,
endpos
,
sum
=
self
.
_callFUT
([
fsfile
])
self
.
assertEqual
(
fn
,
None
)
self
.
assertEqual
(
startpos
,
None
)
self
.
assertEqual
(
endpos
,
None
)
self
.
assertEqual
(
sum
,
None
)
def
test_empty_dat_file
(
self
):
options
=
self
.
_makeOptions
()
fsfile
=
os
.
path
.
join
(
self
.
_repository_directory
,
'foo.fs'
)
datfile
=
os
.
path
.
join
(
self
.
_repository_directory
,
'foo.dat'
)
open
(
datfile
,
'wb'
).
close
()
fn
,
startpos
,
endpos
,
sum
=
self
.
_callFUT
([
fsfile
])
self
.
assertEqual
(
fn
,
None
)
self
.
assertEqual
(
startpos
,
None
)
self
.
assertEqual
(
endpos
,
None
)
self
.
assertEqual
(
sum
,
None
)
def
test_single_line
(
self
):
options
=
self
.
_makeOptions
()
fsfile
=
os
.
path
.
join
(
self
.
_repository_directory
,
'foo.fs'
)
datfile
=
os
.
path
.
join
(
self
.
_repository_directory
,
'foo.dat'
)
f
=
open
(
datfile
,
'wb'
)
f
.
write
(
'foo.fs 0 123 ABC
\
n
'
)
f
.
flush
()
f
.
close
()
fn
,
startpos
,
endpos
,
sum
=
self
.
_callFUT
([
fsfile
])
self
.
assertEqual
(
fn
,
'foo.fs'
)
self
.
assertEqual
(
startpos
,
0
)
self
.
assertEqual
(
endpos
,
123
)
self
.
assertEqual
(
sum
,
'ABC'
)
def
test_multiple_lines
(
self
):
options
=
self
.
_makeOptions
()
fsfile
=
os
.
path
.
join
(
self
.
_repository_directory
,
'foo.fs'
)
datfile
=
os
.
path
.
join
(
self
.
_repository_directory
,
'foo.dat'
)
f
=
open
(
datfile
,
'wb'
)
f
.
write
(
'foo.fs 0 123 ABC
\
n
'
)
f
.
write
(
'bar.deltafs 123 456 DEF
\
n
'
)
f
.
flush
()
f
.
close
()
fn
,
startpos
,
endpos
,
sum
=
self
.
_callFUT
([
fsfile
])
self
.
assertEqual
(
fn
,
'bar.deltafs'
)
self
.
assertEqual
(
startpos
,
123
)
self
.
assertEqual
(
endpos
,
456
)
self
.
assertEqual
(
sum
,
'DEF'
)
class
Test_delete_old_backups
(
OptionsTestBase
,
unittest
.
TestCase
):
def
_makeOptions
(
self
,
filenames
=
()):
options
=
super
(
Test_delete_old_backups
,
self
).
_makeOptions
()
for
filename
in
filenames
:
fqn
=
os
.
path
.
join
(
options
.
repository
,
filename
)
f
=
open
(
fqn
,
'wb'
)
f
.
write
(
'testing delete_old_backups'
)
f
.
close
()
return
options
def
_callFUT
(
self
,
options
=
None
,
filenames
=
()):
from
ZODB.scripts.repozo
import
delete_old_backups
if
options
is
None
:
options
=
self
.
_makeOptions
(
filenames
)
return
delete_old_backups
(
options
)
def
test_empty_dir_doesnt_raise
(
self
):
self
.
_callFUT
()
self
.
assertEqual
(
len
(
os
.
listdir
(
self
.
_repository_directory
)),
0
)
def
test_no_repozo_files_doesnt_raise
(
self
):
FILENAMES
=
[
'bogus.txt'
,
'not_a_repozo_file'
]
self
.
_callFUT
(
filenames
=
FILENAMES
)
remaining
=
os
.
listdir
(
self
.
_repository_directory
)
self
.
assertEqual
(
len
(
remaining
),
len
(
FILENAMES
))
for
name
in
FILENAMES
:
fqn
=
os
.
path
.
join
(
self
.
_repository_directory
,
name
)
self
.
failUnless
(
os
.
path
.
isfile
(
fqn
))
def
test_doesnt_remove_current_repozo_files
(
self
):
FILENAMES
=
[
'2009-12-20-10-08-03.fs'
,
'2009-12-20-10-08-03.dat'
]
self
.
_callFUT
(
filenames
=
FILENAMES
)
remaining
=
os
.
listdir
(
self
.
_repository_directory
)
self
.
assertEqual
(
len
(
remaining
),
len
(
FILENAMES
))
for
name
in
FILENAMES
:
fqn
=
os
.
path
.
join
(
self
.
_repository_directory
,
name
)
self
.
failUnless
(
os
.
path
.
isfile
(
fqn
))
def
test_removes_older_repozo_files
(
self
):
OLDER_FULL
=
[
'2009-12-20-00-01-03.fs'
,
'2009-12-20-00-01-03.dat'
]
DELTAS
=
[
'2009-12-21-00-00-01.deltafs'
,
'2009-12-22-00-00-01.deltafs'
]
CURRENT_FULL
=
[
'2009-12-23-00-00-01.fs'
,
'2009-12-23-00-00-01.dat'
]
FILENAMES
=
OLDER_FULL
+
DELTAS
+
CURRENT_FULL
self
.
_callFUT
(
filenames
=
FILENAMES
)
remaining
=
os
.
listdir
(
self
.
_repository_directory
)
self
.
assertEqual
(
len
(
remaining
),
len
(
CURRENT_FULL
))
for
name
in
OLDER_FULL
:
fqn
=
os
.
path
.
join
(
self
.
_repository_directory
,
name
)
self
.
failIf
(
os
.
path
.
isfile
(
fqn
))
for
name
in
DELTAS
:
fqn
=
os
.
path
.
join
(
self
.
_repository_directory
,
name
)
self
.
failIf
(
os
.
path
.
isfile
(
fqn
))
for
name
in
CURRENT_FULL
:
fqn
=
os
.
path
.
join
(
self
.
_repository_directory
,
name
)
self
.
failUnless
(
os
.
path
.
isfile
(
fqn
))
def
test_removes_older_repozo_files_zipped
(
self
):
OLDER_FULL
=
[
'2009-12-20-00-01-03.fsz'
,
'2009-12-20-00-01-03.dat'
]
DELTAS
=
[
'2009-12-21-00-00-01.deltafsz'
,
'2009-12-22-00-00-01.deltafsz'
]
CURRENT_FULL
=
[
'2009-12-23-00-00-01.fsz'
,
'2009-12-23-00-00-01.dat'
]
FILENAMES
=
OLDER_FULL
+
DELTAS
+
CURRENT_FULL
self
.
_callFUT
(
filenames
=
FILENAMES
)
remaining
=
os
.
listdir
(
self
.
_repository_directory
)
self
.
assertEqual
(
len
(
remaining
),
len
(
CURRENT_FULL
))
for
name
in
OLDER_FULL
:
fqn
=
os
.
path
.
join
(
self
.
_repository_directory
,
name
)
self
.
failIf
(
os
.
path
.
isfile
(
fqn
))
for
name
in
DELTAS
:
fqn
=
os
.
path
.
join
(
self
.
_repository_directory
,
name
)
self
.
failIf
(
os
.
path
.
isfile
(
fqn
))
for
name
in
CURRENT_FULL
:
fqn
=
os
.
path
.
join
(
self
.
_repository_directory
,
name
)
self
.
failUnless
(
os
.
path
.
isfile
(
fqn
))
class
Test_do_full_backup
(
OptionsTestBase
,
unittest
.
TestCase
):
def
_callFUT
(
self
,
options
):
from
ZODB.scripts.repozo
import
do_full_backup
return
do_full_backup
(
options
)
def
_makeDB
(
self
):
import
tempfile
datadir
=
self
.
_data_directory
=
tempfile
.
mkdtemp
()
return
OurDB
(
self
.
_data_directory
)
def
test_dont_overwrite_existing_file
(
self
):
from
ZODB.scripts.repozo
import
WouldOverwriteFiles
from
ZODB.scripts.repozo
import
gen_filename
db
=
self
.
_makeDB
()
options
=
self
.
_makeOptions
(
full
=
True
,
file
=
db
.
_file_name
,
gzip
=
False
,
test_now
=
(
2010
,
5
,
14
,
10
,
51
,
22
),
)
f
=
open
(
os
.
path
.
join
(
self
.
_repository_directory
,
gen_filename
(
options
)),
'w'
)
f
.
write
(
'TESTING'
)
f
.
flush
()
f
.
close
()
self
.
assertRaises
(
WouldOverwriteFiles
,
self
.
_callFUT
,
options
)
def
test_empty
(
self
):
from
ZODB.scripts.repozo
import
gen_filename
db
=
self
.
_makeDB
()
options
=
self
.
_makeOptions
(
file
=
db
.
_file_name
,
gzip
=
False
,
killold
=
False
,
test_now
=
(
2010
,
5
,
14
,
10
,
51
,
22
),
)
self
.
_callFUT
(
options
)
target
=
os
.
path
.
join
(
self
.
_repository_directory
,
gen_filename
(
options
))
original
=
open
(
db
.
_file_name
,
'rb'
).
read
()
self
.
assertEqual
(
open
(
target
,
'rb'
).
read
(),
original
)
datfile
=
os
.
path
.
join
(
self
.
_repository_directory
,
gen_filename
(
options
,
'.dat'
))
self
.
assertEqual
(
open
(
datfile
).
read
(),
'%s 0 %d %s
\
n
'
%
(
target
,
len
(
original
),
md5
(
original
).
hexdigest
()))
class
Test_do_incremental_backup
(
OptionsTestBase
,
unittest
.
TestCase
):
def
_callFUT
(
self
,
options
,
reposz
,
repofiles
):
from
ZODB.scripts.repozo
import
do_incremental_backup
return
do_incremental_backup
(
options
,
reposz
,
repofiles
)
def
_makeDB
(
self
):
import
tempfile
datadir
=
self
.
_data_directory
=
tempfile
.
mkdtemp
()
return
OurDB
(
self
.
_data_directory
)
def
test_dont_overwrite_existing_file
(
self
):
from
ZODB.scripts.repozo
import
WouldOverwriteFiles
from
ZODB.scripts.repozo
import
gen_filename
from
ZODB.scripts.repozo
import
find_files
db
=
self
.
_makeDB
()
options
=
self
.
_makeOptions
(
full
=
False
,
file
=
db
.
_file_name
,
gzip
=
False
,
test_now
=
(
2010
,
5
,
14
,
10
,
51
,
22
),
date
=
None
,
)
f
=
open
(
os
.
path
.
join
(
self
.
_repository_directory
,
gen_filename
(
options
)),
'w'
)
f
.
write
(
'TESTING'
)
f
.
flush
()
f
.
close
()
repofiles
=
find_files
(
options
)
self
.
assertRaises
(
WouldOverwriteFiles
,
self
.
_callFUT
,
options
,
0
,
repofiles
)
def
test_no_changes
(
self
):
from
ZODB.scripts.repozo
import
gen_filename
db
=
self
.
_makeDB
()
oldpos
=
db
.
pos
options
=
self
.
_makeOptions
(
file
=
db
.
_file_name
,
gzip
=
False
,
killold
=
False
,
test_now
=
(
2010
,
5
,
14
,
10
,
51
,
22
),
date
=
None
,
)
fullfile
=
os
.
path
.
join
(
self
.
_repository_directory
,
'2010-05-14-00-00-00.fs'
)
original
=
open
(
db
.
_file_name
,
'rb'
).
read
()
last
=
len
(
original
)
f
=
open
(
fullfile
,
'wb'
)
f
.
write
(
original
)
f
.
flush
()
f
.
close
()
datfile
=
os
.
path
.
join
(
self
.
_repository_directory
,
'2010-05-14-00-00-00.dat'
)
repofiles
=
[
fullfile
,
datfile
]
self
.
_callFUT
(
options
,
oldpos
,
repofiles
)
target
=
os
.
path
.
join
(
self
.
_repository_directory
,
gen_filename
(
options
))
self
.
assertEqual
(
open
(
target
,
'rb'
).
read
(),
''
)
self
.
assertEqual
(
open
(
datfile
).
read
(),
'%s %d %d %s
\
n
'
%
(
target
,
oldpos
,
oldpos
,
md5
(
''
).
hexdigest
()))
def
test_w_changes
(
self
):
from
ZODB.scripts.repozo
import
gen_filename
db
=
self
.
_makeDB
()
oldpos
=
db
.
pos
options
=
self
.
_makeOptions
(
file
=
db
.
_file_name
,
gzip
=
False
,
killold
=
False
,
test_now
=
(
2010
,
5
,
14
,
10
,
51
,
22
),
date
=
None
,
)
fullfile
=
os
.
path
.
join
(
self
.
_repository_directory
,
'2010-05-14-00-00-00.fs'
)
original
=
open
(
db
.
_file_name
,
'rb'
).
read
()
f
=
open
(
fullfile
,
'wb'
)
f
.
write
(
original
)
f
.
flush
()
f
.
close
()
datfile
=
os
.
path
.
join
(
self
.
_repository_directory
,
'2010-05-14-00-00-00.dat'
)
repofiles
=
[
fullfile
,
datfile
]
db
.
mutate
()
newpos
=
db
.
pos
self
.
_callFUT
(
options
,
oldpos
,
repofiles
)
target
=
os
.
path
.
join
(
self
.
_repository_directory
,
gen_filename
(
options
))
f
=
open
(
db
.
_file_name
,
'rb'
)
f
.
seek
(
oldpos
)
increment
=
f
.
read
()
self
.
assertEqual
(
open
(
target
,
'rb'
).
read
(),
increment
)
self
.
assertEqual
(
open
(
datfile
).
read
(),
'%s %d %d %s
\
n
'
%
(
target
,
oldpos
,
newpos
,
md5
(
increment
).
hexdigest
()))
class
MonteCarloTests
(
unittest
.
TestCase
):
layer
=
ZODB
.
tests
.
util
.
MininalTestLayer
(
'repozo'
)
...
...
@@ -96,7 +669,7 @@ class RepozoTests(unittest.TestCase):
from
ZODB.scripts.repozo
import
main
main
(
argv
)
def
test
Repoz
o
(
self
):
def
test
_via_monte_carl
o
(
self
):
self
.
saved_snapshots
=
[]
# list of (name, time) pairs for copies.
for
i
in
range
(
100
):
...
...
@@ -168,94 +741,18 @@ class RepozoTests(unittest.TestCase):
(
correctpath
,
when
,
' '
.
join
(
argv
)))
self
.
assertEquals
(
fguts
,
gguts
,
msg
)
class
Test_delete_old_backups
(
unittest
.
TestCase
):
_repository_directory
=
None
def
tearDown
(
self
):
if
self
.
_repository_directory
is
not
None
:
from
shutil
import
rmtree
rmtree
(
self
.
_repository_directory
)
def
_callFUT
(
self
,
options
=
None
,
filenames
=
()):
from
ZODB.scripts.repozo
import
delete_old_backups
if
options
is
None
:
options
=
self
.
_makeOptions
(
filenames
)
delete_old_backups
(
options
)
def
_makeOptions
(
self
,
filenames
=
()):
import
tempfile
dir
=
self
.
_repository_directory
=
tempfile
.
mkdtemp
()
for
filename
in
filenames
:
fqn
=
os
.
path
.
join
(
dir
,
filename
)
f
=
open
(
fqn
,
'wb'
)
f
.
write
(
'testing delete_old_backups'
)
f
.
close
()
class
Options
(
object
):
repository
=
dir
return
Options
()
def
test_empty_dir_doesnt_raise
(
self
):
self
.
_callFUT
()
self
.
assertEqual
(
len
(
os
.
listdir
(
self
.
_repository_directory
)),
0
)
def
test_no_repozo_files_doesnt_raise
(
self
):
FILENAMES
=
[
'bogus.txt'
,
'not_a_repozo_file'
]
self
.
_callFUT
(
filenames
=
FILENAMES
)
remaining
=
os
.
listdir
(
self
.
_repository_directory
)
self
.
assertEqual
(
len
(
remaining
),
len
(
FILENAMES
))
for
name
in
FILENAMES
:
fqn
=
os
.
path
.
join
(
self
.
_repository_directory
,
name
)
self
.
failUnless
(
os
.
path
.
isfile
(
fqn
))
def
test_doesnt_remove_current_repozo_files
(
self
):
FILENAMES
=
[
'2009-12-20-10-08-03.fs'
,
'2009-12-20-10-08-03.dat'
]
self
.
_callFUT
(
filenames
=
FILENAMES
)
remaining
=
os
.
listdir
(
self
.
_repository_directory
)
self
.
assertEqual
(
len
(
remaining
),
len
(
FILENAMES
))
for
name
in
FILENAMES
:
fqn
=
os
.
path
.
join
(
self
.
_repository_directory
,
name
)
self
.
failUnless
(
os
.
path
.
isfile
(
fqn
))
def
test_removes_older_repozo_files
(
self
):
OLDER_FULL
=
[
'2009-12-20-00-01-03.fs'
,
'2009-12-20-00-01-03.dat'
]
DELTAS
=
[
'2009-12-21-00-00-01.deltafs'
,
'2009-12-22-00-00-01.deltafs'
]
CURRENT_FULL
=
[
'2009-12-23-00-00-01.fs'
,
'2009-12-23-00-00-01.dat'
]
FILENAMES
=
OLDER_FULL
+
DELTAS
+
CURRENT_FULL
self
.
_callFUT
(
filenames
=
FILENAMES
)
remaining
=
os
.
listdir
(
self
.
_repository_directory
)
self
.
assertEqual
(
len
(
remaining
),
len
(
CURRENT_FULL
))
for
name
in
OLDER_FULL
:
fqn
=
os
.
path
.
join
(
self
.
_repository_directory
,
name
)
self
.
failIf
(
os
.
path
.
isfile
(
fqn
))
for
name
in
DELTAS
:
fqn
=
os
.
path
.
join
(
self
.
_repository_directory
,
name
)
self
.
failIf
(
os
.
path
.
isfile
(
fqn
))
for
name
in
CURRENT_FULL
:
fqn
=
os
.
path
.
join
(
self
.
_repository_directory
,
name
)
self
.
failUnless
(
os
.
path
.
isfile
(
fqn
))
def
test_removes_older_repozo_files_zipped
(
self
):
OLDER_FULL
=
[
'2009-12-20-00-01-03.fsz'
,
'2009-12-20-00-01-03.dat'
]
DELTAS
=
[
'2009-12-21-00-00-01.deltafsz'
,
'2009-12-22-00-00-01.deltafsz'
]
CURRENT_FULL
=
[
'2009-12-23-00-00-01.fsz'
,
'2009-12-23-00-00-01.dat'
]
FILENAMES
=
OLDER_FULL
+
DELTAS
+
CURRENT_FULL
self
.
_callFUT
(
filenames
=
FILENAMES
)
remaining
=
os
.
listdir
(
self
.
_repository_directory
)
self
.
assertEqual
(
len
(
remaining
),
len
(
CURRENT_FULL
))
for
name
in
OLDER_FULL
:
fqn
=
os
.
path
.
join
(
self
.
_repository_directory
,
name
)
self
.
failIf
(
os
.
path
.
isfile
(
fqn
))
for
name
in
DELTAS
:
fqn
=
os
.
path
.
join
(
self
.
_repository_directory
,
name
)
self
.
failIf
(
os
.
path
.
isfile
(
fqn
))
for
name
in
CURRENT_FULL
:
fqn
=
os
.
path
.
join
(
self
.
_repository_directory
,
name
)
self
.
failUnless
(
os
.
path
.
isfile
(
fqn
))
def
test_suite
():
return
unittest
.
TestSuite
([
unittest
.
makeSuite
(
RepozoTests
),
unittest
.
makeSuite
(
Test_dofile
),
unittest
.
makeSuite
(
Test_checksum
),
unittest
.
makeSuite
(
Test_copyfile
),
unittest
.
makeSuite
(
Test_concat
),
unittest
.
makeSuite
(
Test_gen_filename
),
unittest
.
makeSuite
(
Test_find_files
),
unittest
.
makeSuite
(
Test_scandat
),
unittest
.
makeSuite
(
Test_delete_old_backups
),
unittest
.
makeSuite
(
Test_do_full_backup
),
unittest
.
makeSuite
(
Test_do_incremental_backup
),
unittest
.
makeSuite
(
MonteCarloTests
),
])
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment