Commit 30bb58f0 authored by Philip Thiem's avatar Philip Thiem

Added a legacy fallback test

Added in code to after a deprecation warning parse the .svn files
Should also parse externals.

--HG--
extra : rebase_source : 9dd3bcf22cb56eb0826051f9e477f155e47cdbf6
parent 8c645eb0
[ssl:sys_platform=='win32'] [ssl:python_version in '2.4, 2.5']
wincertstore==0.1 ssl==1.16
[ssl:sys_platform=='win32' and python_version=='2.4'] [ssl:sys_platform=='win32' and python_version=='2.4']
ctypes==1.0.2 ctypes==1.0.2
...@@ -9,5 +9,5 @@ ctypes==1.0.2 ...@@ -9,5 +9,5 @@ ctypes==1.0.2
[certs] [certs]
certifi==0.0.8 certifi==0.0.8
[ssl:python_version in '2.4, 2.5'] [ssl:sys_platform=='win32']
ssl==1.16 wincertstore==0.1
\ No newline at end of file \ No newline at end of file
...@@ -6,7 +6,9 @@ import xml.dom.pulldom ...@@ -6,7 +6,9 @@ import xml.dom.pulldom
import shlex import shlex
import locale import locale
import unicodedata import unicodedata
import warnings
from setuptools.compat import unicode, bytes from setuptools.compat import unicode, bytes
from xml.sax.saxutils import unescape
try: try:
import urlparse import urlparse
...@@ -23,6 +25,8 @@ from subprocess import Popen as _Popen, PIPE as _PIPE ...@@ -23,6 +25,8 @@ from subprocess import Popen as _Popen, PIPE as _PIPE
# http://bugs.python.org/issue8557 # http://bugs.python.org/issue8557
# http://stackoverflow.com/questions/5658622/ # http://stackoverflow.com/questions/5658622/
# python-subprocess-popen-environment-path # python-subprocess-popen-environment-path
def _run_command(args, stdout=_PIPE, stderr=_PIPE): def _run_command(args, stdout=_PIPE, stderr=_PIPE):
#regarding the shell argument, see: http://bugs.python.org/issue8557 #regarding the shell argument, see: http://bugs.python.org/issue8557
try: try:
...@@ -63,10 +67,10 @@ def _get_xml_data(decoded_str): ...@@ -63,10 +67,10 @@ def _get_xml_data(decoded_str):
return data return data
def joinpath(prefix, suffix): def joinpath(prefix, *suffix):
if not prefix or prefix == '.': if not prefix or prefix == '.':
return suffix return os.path.join(*suffix)
return os.path.join(prefix, suffix) return os.path.join(prefix, *suffix)
def fsencode(path): def fsencode(path):
...@@ -87,6 +91,7 @@ def fsencode(path): ...@@ -87,6 +91,7 @@ def fsencode(path):
return path return path
def fsdecode(path): def fsdecode(path):
"Path must be unicode or in file system encoding already" "Path must be unicode or in file system encoding already"
encoding = sys.getfilesystemencoding() encoding = sys.getfilesystemencoding()
...@@ -98,6 +103,7 @@ def fsdecode(path): ...@@ -98,6 +103,7 @@ def fsdecode(path):
return unicodedata.normalize('NFC', path) return unicodedata.normalize('NFC', path)
def consoledecode(text): def consoledecode(text):
encoding = locale.getpreferredencoding() encoding = locale.getpreferredencoding()
return text.decode(encoding) return text.decode(encoding)
...@@ -130,9 +136,7 @@ def parse_externals_xml(decoded_str, prefix=''): ...@@ -130,9 +136,7 @@ def parse_externals_xml(decoded_str, prefix=''):
if event == 'START_ELEMENT' and node.nodeName == 'target': if event == 'START_ELEMENT' and node.nodeName == 'target':
doc.expandNode(node) doc.expandNode(node)
path = os.path.normpath(node.getAttribute('path')) path = os.path.normpath(node.getAttribute('path'))
log.warn('')
log.warn('PRE: %s' % prefix)
log.warn('PTH: %s' % path)
if os.path.normcase(path).startswith(prefix): if os.path.normcase(path).startswith(prefix):
path = path[len(prefix)+1:] path = path[len(prefix)+1:]
...@@ -154,7 +158,7 @@ def parse_external_prop(lines): ...@@ -154,7 +158,7 @@ def parse_external_prop(lines):
""" """
externals = [] externals = []
for line in lines.splitlines(): for line in lines.splitlines():
line = line.lstrip() #there might be a "\ " line = line.lstrip() # there might be a "\ "
if not line: if not line:
continue continue
...@@ -178,6 +182,26 @@ def parse_external_prop(lines): ...@@ -178,6 +182,26 @@ def parse_external_prop(lines):
return externals return externals
def parse_prop_file(filename, key):
found = False
f = open(filename, 'rt')
data = ''
try:
for line in iter(f.readline, ''): # can't use direct iter!
parts = line.split()
if len(parts) == 2:
kind, length = parts
data = f.read(int(length))
if kind == 'K' and data == key:
found = True
elif kind == 'V' and found:
break
finally:
f.close()
return data
class SvnInfo(object): class SvnInfo(object):
''' '''
Generic svn_info object. No has little knowledge of how to extract Generic svn_info object. No has little knowledge of how to extract
...@@ -203,14 +227,24 @@ class SvnInfo(object): ...@@ -203,14 +227,24 @@ class SvnInfo(object):
@classmethod @classmethod
def load(cls, dirname=''): def load(cls, dirname=''):
code, data = _run_command(['svn', 'info', os.path.normpath(dirname)]) normdir = os.path.normpath(dirname)
code, data = _run_command(['svn', 'info', normdir])
has_svn = os.path.isdir(os.path.join(normdir, '.svn'))
svn_version = tuple(cls.get_svn_version().split('.')) svn_version = tuple(cls.get_svn_version().split('.'))
try:
base_svn_version = tuple(int(x) for x in svn_version[:2]) base_svn_version = tuple(int(x) for x in svn_version[:2])
if code and base_svn_version: except ValueError:
#Not an SVN repository or compatible one base_svn_version = tuple()
return SvnInfo(dirname)
elif base_svn_version < (1, 3): if has_svn and (code or not base_svn_version
log.warn('Insufficent version of SVN found') or base_svn_version < (1, 3)):
log.warn('Fallback onto .svn parsing')
warnings.warn(("No SVN 1.3+ command found: falling back "
"on pre 1.7 .svn parsing"), DeprecationWarning)
return SvnFileInfo(dirname)
elif not has_svn:
log.warn('Not SVN Repository')
return SvnInfo(dirname) return SvnInfo(dirname)
elif base_svn_version < (1, 5): elif base_svn_version < (1, 5):
return Svn13Info(dirname) return Svn13Info(dirname)
...@@ -259,7 +293,7 @@ class SvnInfo(object): ...@@ -259,7 +293,7 @@ class SvnInfo(object):
Iterate over the non-deleted file entries in the repository path Iterate over the non-deleted file entries in the repository path
''' '''
for item, kind in self.entries: for item, kind in self.entries:
if kind.lower()=='file': if kind.lower() == 'file':
yield item yield item
def iter_dirs(self, include_root=True): def iter_dirs(self, include_root=True):
...@@ -269,7 +303,7 @@ class SvnInfo(object): ...@@ -269,7 +303,7 @@ class SvnInfo(object):
if include_root: if include_root:
yield self.path yield self.path
for item, kind in self.entries: for item, kind in self.entries:
if kind.lower()=='dir': if kind.lower() == 'dir':
yield item yield item
def get_entries(self): def get_entries(self):
...@@ -278,6 +312,7 @@ class SvnInfo(object): ...@@ -278,6 +312,7 @@ class SvnInfo(object):
def get_externals(self): def get_externals(self):
return [] return []
class Svn13Info(SvnInfo): class Svn13Info(SvnInfo):
def get_entries(self): def get_entries(self):
code, data = _run_command(['svn', 'info', '-R', '--xml', self.path]) code, data = _run_command(['svn', 'info', '-R', '--xml', self.path])
...@@ -316,6 +351,73 @@ class Svn15Info(Svn13Info): ...@@ -316,6 +351,73 @@ class Svn15Info(Svn13Info):
return parse_externals_xml(lines, prefix=os.path.abspath(self.path)) return parse_externals_xml(lines, prefix=os.path.abspath(self.path))
class SvnFileInfo(SvnInfo):
def __init__(self, path=''):
super(SvnFileInfo, self).__init__(path)
self._directories = None
self._revision = None
def _walk_svn(self, base):
entry_file = joinpath(base, '.svn', 'entries')
if os.path.isfile(entry_file):
entries = SVNEntriesFile.load(base)
yield (base, False, entries.parse_revision())
for path in entries.get_undeleted_records():
path = joinpath(base, path)
if os.path.isfile(path):
yield (path, True, None)
elif os.path.isdir(path):
for item in self._walk_svn(path):
yield item
def _build_entries(self):
dirs = list()
files = list()
rev = 0
for path, isfile, dir_rev in self._walk_svn(self.path):
if isfile:
files.append(path)
else:
dirs.append(path)
rev = max(rev, dir_rev)
self._directories = dirs
self._entries = files
self._revision = rev
def get_entries(self):
if self._entries is None:
self._build_entries()
return self._entries
def get_revision(self):
if self._revision is None:
self._build_entries()
return self._revision
def get_externals(self):
if self._directories is None:
self._build_entries()
prop_files = [['.svn', 'dir-prop-base'],
['.svn', 'dir-props']]
externals = []
for dirname in self._directories:
prop_file = None
for rel_parts in prop_files:
filename = joinpath(dirname, *rel_parts)
if os.path.isfile(filename):
prop_file = filename
if prop_file is not None:
ext_prop = parse_prop_file(prop_file, 'svn:externals')
externals.extend(parse_external_prop(ext_prop))
return externals
def svn_finder(dirname=''): def svn_finder(dirname=''):
#combined externals due to common interface #combined externals due to common interface
#combined externals and entries due to lack of dir_props in 1.7 #combined externals and entries due to lack of dir_props in 1.7
...@@ -328,6 +430,110 @@ def svn_finder(dirname=''): ...@@ -328,6 +430,110 @@ def svn_finder(dirname=''):
for sub_path in sub_info.iter_files(): for sub_path in sub_info.iter_files():
yield fsencode(sub_path) yield fsencode(sub_path)
class SVNEntriesFile(object):
def __init__(self, data):
self.data = data
@classmethod
def load(class_, base):
filename = os.path.join(base, '.svn', 'entries')
f = open(filename)
try:
result = SVNEntriesFile.read(f)
finally:
f.close()
return result
@classmethod
def read(class_, fileobj):
data = fileobj.read()
is_xml = data.startswith('<?xml')
class_ = [SVNEntriesFileText, SVNEntriesFileXML][is_xml]
return class_(data)
def parse_revision(self):
all_revs = self.parse_revision_numbers() + [0]
return max(all_revs)
class SVNEntriesFileText(SVNEntriesFile):
known_svn_versions = {
'1.4.x': 8,
'1.5.x': 9,
'1.6.x': 10,
}
def __get_cached_sections(self):
return self.sections
def get_sections(self):
SECTION_DIVIDER = '\f\n'
sections = self.data.split(SECTION_DIVIDER)
sections = [x for x in map(str.splitlines, sections)]
try:
# remove the SVN version number from the first line
svn_version = int(sections[0].pop(0))
if not svn_version in self.known_svn_versions.values():
log.warn("Unknown subversion verson %d", svn_version)
except ValueError:
return
self.sections = sections
self.get_sections = self.__get_cached_sections
return self.sections
def is_valid(self):
return bool(self.get_sections())
def get_url(self):
return self.get_sections()[0][4]
def parse_revision_numbers(self):
revision_line_number = 9
rev_numbers = [
int(section[revision_line_number])
for section in self.get_sections()
if (len(section) > revision_line_number
and section[revision_line_number])
]
return rev_numbers
def get_undeleted_records(self):
undeleted = lambda s: s and s[0] and (len(s) < 6 or s[5] != 'delete')
result = [
section[0]
for section in self.get_sections()
if undeleted(section)
]
return result
class SVNEntriesFileXML(SVNEntriesFile):
def is_valid(self):
return True
def get_url(self):
"Get repository URL"
urlre = re.compile('url="([^"]+)"')
return urlre.search(self.data).group(1)
def parse_revision_numbers(self):
revre = re.compile(r'committed-rev="(\d+)"')
return [
int(m.group(1))
for m in revre.finditer(self.data)
]
def get_undeleted_records(self):
entries_pattern = \
re.compile(r'name="([^"]+)"(?![^>]+deleted="true")', re.I)
results = [
unescape(match.group(1))
for match in entries_pattern.finditer(self.data)
]
return results
if __name__ == '__main__': if __name__ == '__main__':
for name in svn_finder(sys.argv[1]): for name in svn_finder(sys.argv[1]):
print(name) print(name)
...@@ -48,6 +48,27 @@ class TestEggInfo(unittest.TestCase): ...@@ -48,6 +48,27 @@ class TestEggInfo(unittest.TestCase):
rev = egg_info.egg_info.get_svn_revision() rev = egg_info.egg_info.get_svn_revision()
self.assertEqual(rev, '89000') self.assertEqual(rev, '89000')
def test_version_10_format_legacy_parser(self):
"""
"""
path_variable = None
for env in os.environ:
if env.lower() == 'path':
path_variable = env
if path_variable is None:
self.skipTest('Cannot figure out how to modify path')
old_path = os.environ[path_variable]
os.environ[path_variable] = ''
try:
self._write_entries(ENTRIES_V10)
rev = egg_info.egg_info.get_svn_revision()
finally:
os.environ[path_variable] = old_path
self.assertEqual(rev, '89000')
def test_suite(): def test_suite():
return unittest.defaultTestLoader.loadTestsFromName(__name__) return unittest.defaultTestLoader.loadTestsFromName(__name__)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment