esistools.py 9.02 KB
Newer Older
1 2 3 4
"""Miscellaneous utility functions useful for dealing with ESIS streams."""

import re

5
import xml.dom.pulldom
6

7 8 9 10 11 12
import xml.sax
import xml.sax.handler
import xml.sax.xmlreader


_data_match = re.compile(r"[^\\][^\\]*").match
13 14 15 16

def decode(s):
    r = ''
    while s:
17
        m = _data_match(s)
18 19
        if m:
            r = r + m.group()
20
            s = s[m.end():]
21 22 23 24 25 26
        elif s[1] == "\\":
            r = r + "\\"
            s = s[2:]
        elif s[1] == "n":
            r = r + "\n"
            s = s[2:]
27 28 29 30
        elif s[1] == "%":
            s = s[2:]
            n, s = s.split(";", 1)
            r = r + unichr(int(n))
31
        else:
Collin Winter's avatar
Collin Winter committed
32
            raise ValueError("can't handle %r" % s)
33 34 35 36
    return r


_charmap = {}
37 38 39
for c in range(128):
    _charmap[chr(c)] = chr(c)
    _charmap[unichr(c + 128)] = chr(c + 128)
40 41 42 43
_charmap["\n"] = r"\n"
_charmap["\\"] = r"\\"
del c

44
_null_join = ''.join
45
def encode(s):
46 47 48 49
    try:
        return _null_join(map(_charmap.get, s))
    except TypeError:
        raise Exception("could not encode %r: %r" % (s, map(_charmap.get, s)))
50 51


52 53
class ESISReader(xml.sax.xmlreader.XMLReader):
    """SAX Reader which reads from an ESIS stream.
54

55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
    No verification of the document structure is performed by the
    reader; a general verifier could be used as the target
    ContentHandler instance.

    """
    _decl_handler = None
    _lexical_handler = None

    _public_id = None
    _system_id = None

    _buffer = ""
    _is_empty = 0
    _lineno = 0
    _started = 0

    def __init__(self, contentHandler=None, errorHandler=None):
        xml.sax.xmlreader.XMLReader.__init__(self)
        self._attrs = {}
        self._attributes = Attributes(self._attrs)
        self._locator = Locator()
        self._empties = {}
        if contentHandler:
            self.setContentHandler(contentHandler)
        if errorHandler:
            self.setErrorHandler(errorHandler)
81

82
    def get_empties(self):
Collin Winter's avatar
Collin Winter committed
83
        return list(self._empties.keys())
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141

    #
    #  XMLReader interface
    #

    def parse(self, source):
        raise RuntimeError
        self._locator._public_id = source.getPublicId()
        self._locator._system_id = source.getSystemId()
        fp = source.getByteStream()
        handler = self.getContentHandler()
        if handler:
            handler.startDocument()
        lineno = 0
        while 1:
            token, data = self._get_token(fp)
            if token is None:
                break
            lineno = lineno + 1
            self._locator._lineno = lineno
            self._handle_token(token, data)
        handler = self.getContentHandler()
        if handler:
            handler.startDocument()

    def feed(self, data):
        if not self._started:
            handler = self.getContentHandler()
            if handler:
                handler.startDocument()
            self._started = 1
        data = self._buffer + data
        self._buffer = None
        lines = data.split("\n")
        if lines:
            for line in lines[:-1]:
                self._lineno = self._lineno + 1
                self._locator._lineno = self._lineno
                if not line:
                    e = xml.sax.SAXParseException(
                        "ESIS input line contains no token type mark",
                        None, self._locator)
                    self.getErrorHandler().error(e)
                else:
                    self._handle_token(line[0], line[1:])
            self._buffer = lines[-1]
        else:
            self._buffer = ""

    def close(self):
        handler = self.getContentHandler()
        if handler:
            handler.endDocument()
        self._buffer = ""

    def _get_token(self, fp):
        try:
            line = fp.readline()
142
        except IOError as e:
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
            e = SAXException("I/O error reading input stream", e)
            self.getErrorHandler().fatalError(e)
            return
        if not line:
            return None, None
        if line[-1] == "\n":
            line = line[:-1]
        if not line:
            e = xml.sax.SAXParseException(
                "ESIS input line contains no token type mark",
                None, self._locator)
            self.getErrorHandler().error(e)
            return
        return line[0], line[1:]

    def _handle_token(self, token, data):
        handler = self.getContentHandler()
        if token == '-':
            if data and handler:
                handler.characters(decode(data))
        elif token == ')':
            if handler:
                handler.endElement(decode(data))
        elif token == '(':
            if self._is_empty:
                self._empties[data] = 1
Fred Drake's avatar
Fred Drake committed
169
                self._is_empty = 0
170 171 172 173 174 175 176 177 178 179 180 181 182 183
            if handler:
                handler.startElement(data, self._attributes)
            self._attrs.clear()
        elif token == 'A':
            name, value = data.split(' ', 1)
            if value != "IMPLIED":
                type, value = value.split(' ', 1)
                self._attrs[name] = (decode(value), type)
        elif token == '&':
            # entity reference in SAX?
            pass
        elif token == '?':
            if handler:
                if ' ' in data:
184
                    target, data = data.split(None, 1)
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
                else:
                    target, data = data, ""
                handler.processingInstruction(target, decode(data))
        elif token == 'N':
            handler = self.getDTDHandler()
            if handler:
                handler.notationDecl(data, self._public_id, self._system_id)
            self._public_id = None
            self._system_id = None
        elif token == 'p':
            self._public_id = decode(data)
        elif token == 's':
            self._system_id = decode(data)
        elif token == 'e':
            self._is_empty = 1
        elif token == 'C':
            pass
        else:
            e = SAXParseException("unknown ESIS token in event stream",
                                  None, self._locator)
            self.getErrorHandler().error(e)

    def setContentHandler(self, handler):
        old = self.getContentHandler()
        if old:
            old.setDocumentLocator(None)
        if handler:
            handler.setDocumentLocator(self._locator)
        xml.sax.xmlreader.XMLReader.setContentHandler(self, handler)

    def getProperty(self, property):
        if property == xml.sax.handler.property_lexical_handler:
            return self._lexical_handler

        elif property == xml.sax.handler.property_declaration_handler:
            return self._decl_handler

        else:
223 224
            raise xml.sax.SAXNotRecognizedException("unknown property %r"
                                                    % (property, ))
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272

    def setProperty(self, property, value):
        if property == xml.sax.handler.property_lexical_handler:
            if self._lexical_handler:
                self._lexical_handler.setDocumentLocator(None)
            if value:
                value.setDocumentLocator(self._locator)
            self._lexical_handler = value

        elif property == xml.sax.handler.property_declaration_handler:
            if self._decl_handler:
                self._decl_handler.setDocumentLocator(None)
            if value:
                value.setDocumentLocator(self._locator)
            self._decl_handler = value

        else:
            raise xml.sax.SAXNotRecognizedException()

    def getFeature(self, feature):
        if feature == xml.sax.handler.feature_namespaces:
            return 1
        else:
            return xml.sax.xmlreader.XMLReader.getFeature(self, feature)

    def setFeature(self, feature, enabled):
        if feature == xml.sax.handler.feature_namespaces:
            pass
        else:
            xml.sax.xmlreader.XMLReader.setFeature(self, feature, enabled)


class Attributes(xml.sax.xmlreader.AttributesImpl):
    # self._attrs has the form {name: (value, type)}

    def getType(self, name):
        return self._attrs[name][1]

    def getValue(self, name):
        return self._attrs[name][0]

    def getValueByQName(self, name):
        return self._attrs[name][0]

    def __getitem__(self, name):
        return self._attrs[name][0]

    def get(self, name, default=None):
Collin Winter's avatar
Collin Winter committed
273
        if name in self._attrs:
274 275 276 277 278 279 280 281 282 283 284
            return self._attrs[name][0]
        return default

    def items(self):
        L = []
        for name, (value, type) in self._attrs.items():
            L.append((name, value))
        return L

    def values(self):
        L = []
Collin Winter's avatar
Collin Winter committed
285
        for value, type in list(self._attrs.values()):
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
            L.append(value)
        return L


class Locator(xml.sax.xmlreader.Locator):
    _lineno = -1
    _public_id = None
    _system_id = None

    def getLineNumber(self):
        return self._lineno

    def getPublicId(self):
        return self._public_id

    def getSystemId(self):
        return self._system_id


def parse(stream_or_string, parser=None):
    if type(stream_or_string) in [type(""), type(u"")]:
        stream = open(stream_or_string)
    else:
        stream = stream_or_string
    if not parser:
        parser = ESISReader()
    return xml.dom.pulldom.DOMEventStream(stream, parser, (2 ** 14) - 20)