Commit 8d55787a authored by Tom Niget's avatar Tom Niget

Add scanfs test

parent 32799998
# coding: utf-8
import hashlib
import io
import json
import os
import stat
from typing import Self
class StatResult:
st_mode: int
st_ino: int
st_dev: int
st_nlink: int
st_uid: int
st_gid: int
st_size: int
st_atime: float
st_mtime: float
st_ctime: float
def __init__(self):
pass
def stat_result_to_ours(stat_result: os.stat_result):
res = StatResult()
res.st_mode = stat_result.st_mode
res.st_ino = stat_result.st_ino
res.st_dev = stat_result.st_dev
res.st_nlink = stat_result.st_nlink
res.st_uid = stat_result.st_uid
res.st_gid = stat_result.st_gid
res.st_size = stat_result.st_size
res.st_atime = stat_result.st_atime
res.st_mtime = stat_result.st_mtime
res.st_ctime = stat_result.st_ctime
return res
class Tree:
childs: dict[str, Self]
stat: StatResult
ignored: bool
symlink_target: str
md5: str
sha1: str
sha256: str
sha512: str
def __init__(self, stat):
self.stat = stat
self.childs = {}
def compute_hashes(entry_path, tree: Tree):
with open(entry_path, "rb") as f:
md5 = hashlib.md5()
sha1 = hashlib.sha1()
sha256 = hashlib.sha256()
sha512 = hashlib.sha512()
while True:
data = f.read(io.DEFAULT_BUFFER_SIZE)
md5.update(data)
sha1.update(data)
sha256.update(data)
sha512.update(data)
if len(data) < io.DEFAULT_BUFFER_SIZE:
break
tree.md5 = md5.hexdigest()
tree.sha1 = sha1.hexdigest()
tree.sha256 = sha256.hexdigest()
tree.sha512 = sha512.hexdigest()
def construct_fs_tree(cur_tree: Tree, path: str, dev_whitelist, ignored_dirs: list[str]):
path_stat = cur_tree.stat
if not path_stat.st_dev in dev_whitelist:
return cur_tree
for dir in ignored_dirs:
if path.startswith(dir):
cur_tree.ignored = True
return cur_tree
try:
with os.scandir(path) as it:
for entry in it:
try:
entry_path = os.fsdecode(entry.path)
entry_name = os.fsdecode(entry.name)
entry_stat: os.stat_result
try:
entry_stat = os.stat(entry_path) # todo: follow_symlinks=False
except Exception:
# entry_stat = None
pass
cur_tree.childs[entry_name] = Tree(stat_result_to_ours(entry_stat))
if stat.S_ISDIR(entry_stat.st_mode):
construct_fs_tree(cur_tree.childs[entry_name],
entry_path, dev_whitelist, ignored_dirs)
elif stat.S_ISREG(entry_stat.st_mode):
compute_hashes(entry_path, cur_tree.childs[entry_name])
elif stat.S_ISLNK(entry_stat.st_mode):
cur_tree.childs[entry_name].symlink_target = os.readlink(
entry_path)
except Exception:
pass
except Exception:
pass
return cur_tree
def main():
ignored_dirs = ["/opt/slapgrid", "/srv/slapgrid"]
dev_whitelist = []
for path in [".", "/", "/boot"]:
try:
dev_whitelist.append(
os.stat(path).st_dev) # todo: follow_symlinks=False
except FileNotFoundError:
pass
tree = construct_fs_tree(Tree(stat_result_to_ours(os.stat("."))), ".", dev_whitelist, ignored_dirs)
with open('result.json', 'w') as fp:
json.dump(tree, fp, default=vars)
if __name__ == "__main__":
main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment