Commit 2af540e1 authored by Fred Drake's avatar Fred Drake

Paul Prescod <paul@prescod.net>:

SAX interfaces for Python.
parent 2c86ae80
"""Different kinds of SAX Exceptions"""
import sys
if sys.platform[:4] == "java":
from java.lang import Exception
# ===== SAXEXCEPTION =====
class SAXException(Exception):
"""Encapsulate an XML error or warning. This class can contain
basic error or warning information from either the XML parser or
the application: you can subclass it to provide additional
functionality, or to add localization. Note that although you will
receive a SAXException as the argument to the handlers in the
ErrorHandler interface, you are not actually required to throw
the exception; instead, you can simply read the information in
it."""
def __init__(self, msg, exception = None):
"""Creates an exception. The message is required, but the exception
is optional."""
self._msg = msg
self._exception = exception
def getMessage(self):
"Return a message for this exception."
return self._msg
def getException(self):
"Return the embedded exception, or None if there was none."
return self._exception
def __str__(self):
"Create a string representation of the exception."
return self._msg
def __getitem__(self, ix):
"""Avoids weird error messages if someone does exception[ix] by
mistake, since Exception has __getitem__ defined."""
raise NameError("__getitem__")
# ===== SAXPARSEEXCEPTION =====
class SAXParseException(SAXException):
"""Encapsulate an XML parse error or warning.
This exception will include information for locating the error in
the original XML document. Note that although the application will
receive a SAXParseException as the argument to the handlers in the
ErrorHandler interface, the application is not actually required
to throw the exception; instead, it can simply read the
information in it and take a different action.
Since this exception is a subclass of SAXException, it inherits
the ability to wrap another exception."""
def __init__(self, msg, exception, locator):
"Creates the exception. The exception parameter is allowed to be None."
SAXException.__init__(self, msg, exception)
self._locator = locator
def getColumnNumber(self):
"""The column number of the end of the text where the exception
occurred."""
return self._locator.getColumnNumber()
def getLineNumber(self):
"The line number of the end of the text where the exception occurred."
return self._locator.getLineNumber()
def getPublicId(self):
"Get the public identifier of the entity where the exception occurred."
return self._locator.getPublicId()
def getSystemId(self):
"Get the system identifier of the entity where the exception occurred."
return self._locator.getSystemId()
def __str__(self):
"Create a string representation of the exception."
return "%s at %s:%d:%d" % (self._msg,
self.getSystemId(),
self.getLineNumber(),
self.getColumnNumber())
# ===== SAXNOTRECOGNIZEDEXCEPTION =====
class SAXNotRecognizedException(SAXException):
"""Exception class for an unrecognized identifier.
An XMLReader will raise this exception when it is confronted with an
unrecognized feature or property. SAX applications and extensions may
use this class for similar purposes."""
# ===== SAXNOTSUPPORTEDEXCEPTION =====
class SAXNotSupportedException(SAXException):
"""Exception class for an unsupported operation.
An XMLReader will raise this exception when a service it cannot
perform is requested (specifically setting a state or value). SAX
applications and extensions may use this class for similar
purposes."""
"""
SAX driver for the Pyexpat C module. This driver works with
pyexpat.__version__ == '1.5'.
$Id$
"""
# Todo on driver:
# - make it support external entities (wait for pyexpat.c)
# - enable configuration between reset() and feed() calls
# - support lexical events?
# - proper inputsource handling
# - properties and features
# Todo on pyexpat.c:
# - support XML_ExternalEntityParserCreate
# - exceptions in callouts from pyexpat to python code lose position info
version = "0.20"
from string import split
from xml.sax import xmlreader
import pyexpat
import xml.sax
# --- ExpatParser
class ExpatParser( xmlreader.IncrementalParser, xmlreader.Locator ):
"SAX driver for the Pyexpat C module."
def __init__(self, namespaceHandling=0, bufsize=2**16-20):
xmlreader.IncrementalParser.__init__(self, bufsize)
self._source = None
self._parser = None
self._namespaces = namespaceHandling
self._parsing = 0
# XMLReader methods
def parse(self, stream_or_string ):
"Parse an XML document from a URL."
if type( stream_or_string ) == type( "" ):
stream=open( stream_or_string )
else:
stream=stream_or_string
self.reset()
self._cont_handler.setDocumentLocator(self)
try:
xmlreader.IncrementalParser.parse(self, stream)
except pyexpat.error:
error_code = self._parser.ErrorCode
raise xml.sax.SAXParseException(pyexpat.ErrorString(error_code),
None, self)
self._cont_handler.endDocument()
def prepareParser(self, filename=None):
self._source = filename
if self._source != None:
self._parser.SetBase(self._source)
def getFeature(self, name):
"Looks up and returns the state of a SAX2 feature."
raise SAXNotRecognizedException("Feature '%s' not recognized" % name)
def setFeature(self, name, state):
"Sets the state of a SAX2 feature."
raise SAXNotRecognizedException("Feature '%s' not recognized" % name)
def getProperty(self, name):
"Looks up and returns the value of a SAX2 property."
raise SAXNotRecognizedException("Property '%s' not recognized" % name)
def setProperty(self, name, value):
"Sets the value of a SAX2 property."
raise SAXNotRecognizedException("Property '%s' not recognized" % name)
# IncrementalParser methods
def feed(self, data):
if not self._parsing:
self._parsing=1
self.reset()
self._cont_handler.startDocument()
# FIXME: error checking and endDocument()
self._parser.Parse(data, 0)
def close(self):
if self._parsing:
self._cont_handler.endDocument()
self._parsing=0
self._parser.Parse("", 1)
def reset(self):
if self._namespaces:
self._parser = pyexpat.ParserCreate(None, " ")
self._parser.StartElementHandler = self.start_element_ns
self._parser.EndElementHandler = self.end_element_ns
else:
self._parser = pyexpat.ParserCreate()
self._parser.StartElementHandler = self._cont_handler.startElement
self._parser.EndElementHandler = self._cont_handler.endElement
self._parser.ProcessingInstructionHandler = \
self._cont_handler.processingInstruction
self._parser.CharacterDataHandler = self._cont_handler.characters
self._parser.UnparsedEntityDeclHandler = self.unparsed_entity_decl
self._parser.NotationDeclHandler = self.notation_decl
self._parser.StartNamespaceDeclHandler = self.start_namespace_decl
self._parser.EndNamespaceDeclHandler = self.end_namespace_decl
# self._parser.CommentHandler =
# self._parser.StartCdataSectionHandler =
# self._parser.EndCdataSectionHandler =
# self._parser.DefaultHandler =
# self._parser.DefaultHandlerExpand =
# self._parser.NotStandaloneHandler =
self._parser.ExternalEntityRefHandler = self.external_entity_ref
# Locator methods
def getColumnNumber(self):
return self._parser.ErrorColumnNumber
def getLineNumber(self):
return self._parser.ErrorLineNumber
def getPublicId(self):
return self._source.getPublicId()
def getSystemId(self):
return self._parser.GetBase()
# internal methods
# event handlers
def start_element(self, name, attrs):
self._cont_handler.startElement(name,
xmlreader.AttributesImpl(attrs, attrs))
def end_element(self, name):
self._cont_handler.endElement(name)
def start_element_ns(self, name, attrs):
pair = split(name)
if len(pair) == 1:
tup = (None, name, None)
else:
tup = pair+[None] # prefix is not implemented yet!
self._cont_handler.startElement(tup,
xmlreader.AttributesImpl(attrs, None))
def end_element_ns(self, name):
pair = split(name)
if len(pair) == 1:
name = (None, name, None)
else:
name = pair+[None] # prefix is not implemented yet!
self._cont_handler.endElement(name)
def processing_instruction(self, target, data):
self._cont_handler.processingInstruction(target, data)
def character_data(self, data):
self._cont_handler.characters(data)
def start_namespace_decl(self, prefix, uri):
self._cont_handler.startPrefixMapping(prefix, uri)
def end_namespace_decl(self, prefix):
self._cont_handler.endPrefixMapping(prefix)
def unparsed_entity_decl(self, name, base, sysid, pubid, notation_name):
self._dtd_handler.unparsedEntityDecl(name, pubid, sysid, notation_name)
def notation_decl(self, name, base, sysid, pubid):
self._dtd_handler.notationDecl(name, pubid, sysid)
def external_entity_ref(self, context, base, sysid, pubid):
assert 0 # not implemented
source = self._ent_handler.resolveEntity(pubid, sysid)
source = saxutils.prepare_input_source(source)
# FIXME: create new parser, stack self._source and self._parser
# FIXME: reuse code from self.parse(...)
return 1
# ---
def create_parser(*args, **kwargs):
return apply( ExpatParser, args, kwargs )
# ---
if __name__ == "__main__":
import xml.sax
p = create_parser()
p.setContentHandler(xml.sax.XMLGenerator())
p.setErrorHandler(xml.sax.ErrorHandler())
p.parse("../../../hamlet.xml")
This diff is collapsed.
"""
A library of useful helper classes to the sax classes, for the
convenience of application and driver writers.
$Id$
"""
import types, string, sys, urllib
import handler
def escape(data, entities = {}):
"""Escape &, <, and > in a string of data.
You can escape other strings of data by passing a dictionary as
the optional entities parameter. The keys and values must all be
strings; each key will be replaced with its corresponding value.
"""
data = string.replace(data, "&", "&amp;")
data = string.replace(data, "<", "&lt;")
data = string.replace(data, ">", "&gt;")
for chars, entity in entities.items():
data = string.replace(data, chars, entity)
return data
class XMLGenerator(handler.ContentHandler):
def __init__(self, out = sys.stdout):
handler.ContentHandler.__init__(self)
self._out = out
# ContentHandler methods
def startDocument(self):
self._out.write('<?xml version="1.0" encoding="iso-8859-1"?>\n')
def startPrefixMapping(self, prefix, uri):
pass
def endPrefixMapping(self, prefix):
pass
def startElement(self, name, attrs):
if type(name)==type(()):
uri, localname, prefix=name
name="%s:%s"%(prefix,localname)
self._out.write('<' + name)
for (name, value) in attrs.items():
self._out.write(' %s="%s"' % (name, escape(value)))
self._out.write('>')
def endElement(self, name):
# FIXME: not namespace friendly yet
self._out.write('</%s>' % name)
def characters(self, content):
self._out.write(escape(content))
def ignorableWhitespace(self, content):
self._out.write(content)
def processingInstruction(self, target, data):
self._out.write('<?%s %s?>' % (target, data))
class XMLFilterBase:
"""This class is designed to sit between an XMLReader and the
client application's event handlers. By default, it does nothing
but pass requests up to the reader and events on to the handlers
unmodified, but subclasses can override specific methods to modify
the event stream or the configuration requests as they pass
through."""
# ErrorHandler methods
def error(self, exception):
self._err_handler.error(exception)
def fatalError(self, exception):
self._err_handler.fatalError(exception)
def warning(self, exception):
self._err_handler.warning(exception)
# ContentHandler methods
def setDocumentLocator(self, locator):
self._cont_handler.setDocumentLocator(locator)
def startDocument(self):
self._cont_handler.startDocument()
def endDocument(self):
self._cont_handler.endDocument()
def startPrefixMapping(self, prefix, uri):
self._cont_handler.startPrefixMapping(prefix, uri)
def endPrefixMapping(self, prefix):
self._cont_handler.endPrefixMapping(prefix)
def startElement(self, name, attrs):
self._cont_handler.startElement(name, attrs)
def endElement(self, name, qname):
self._cont_handler.endElement(name, qname)
def characters(self, content):
self._cont_handler.characters(content)
def ignorableWhitespace(self, chars, start, end):
self._cont_handler.ignorableWhitespace(chars, start, end)
def processingInstruction(self, target, data):
self._cont_handler.processingInstruction(target, data)
def skippedEntity(self, name):
self._cont_handler.skippedEntity(name)
# DTDHandler methods
def notationDecl(self, name, publicId, systemId):
self._dtd_handler.notationDecl(name, publicId, systemId)
def unparsedEntityDecl(self, name, publicId, systemId, ndata):
self._dtd_handler.unparsedEntityDecl(name, publicId, systemId, ndata)
# EntityResolver methods
def resolveEntity(self, publicId, systemId):
self._ent_handler.resolveEntity(publicId, systemId)
# XMLReader methods
def parse(self, source):
self._parent.setContentHandler(self)
self._parent.setErrorHandler(self)
self._parent.setEntityResolver(self)
self._parent.setDTDHandler(self)
self._parent.parse(source)
def setLocale(self, locale):
self._parent.setLocale(locale)
def getFeature(self, name):
return self._parent.getFeature(name)
def setFeature(self, name, state):
self._parent.setFeature(name, state)
def getProperty(self, name):
return self._parent.getProperty(name)
def setProperty(self, name, value):
self._parent.setProperty(name, value)
import handler
"""An XML Reader is the SAX 2 name for an XML parser. XML Parsers
should be based on this code. """
# ===== XMLREADER =====
class XMLReader:
def __init__(self):
self._cont_handler = handler.ContentHandler()
#self._dtd_handler = handler.DTDHandler()
#self._ent_handler = handler.EntityResolver()
self._err_handler = handler.ErrorHandler()
def parse(self, source):
"Parse an XML document from a system identifier or an InputSource."
raise NotImplementedError("This method must be implemented!")
def getContentHandler(self):
"Returns the current ContentHandler."
return self._cont_handler
def setContentHandler(self, handler):
"Registers a new object to receive document content events."
self._cont_handler = handler
def getDTDHandler(self):
"Returns the current DTD handler."
return self._dtd_handler
def setDTDHandler(self, handler):
"Register an object to receive basic DTD-related events."
self._dtd_handler = handler
def getEntityResolver(self):
"Returns the current EntityResolver."
return self._ent_handler
def setEntityResolver(self, resolver):
"Register an object to resolve external entities."
self._ent_handler = resolver
def getErrorHandler(self):
"Returns the current ErrorHandler."
return self._err_handler
def setErrorHandler(self, handler):
"Register an object to receive error-message events."
self._err_handler = handler
def setLocale(self, locale):
"""Allow an application to set the locale for errors and warnings.
SAX parsers are not required to provide localisation for errors
and warnings; if they cannot support the requested locale,
however, they must throw a SAX exception. Applications may
request a locale change in the middle of a parse."""
raise SAXNotSupportedException("Locale support not implemented")
def getFeature(self, name):
"Looks up and returns the state of a SAX2 feature."
raise SAXNotRecognizedException("Feature '%s' not recognized" % name)
def setFeature(self, name, state):
"Sets the state of a SAX2 feature."
raise SAXNotRecognizedException("Feature '%s' not recognized" % name)
def getProperty(self, name):
"Looks up and returns the value of a SAX2 property."
raise SAXNotRecognizedException("Property '%s' not recognized" % name)
def setProperty(self, name, value):
"Sets the value of a SAX2 property."
raise SAXNotRecognizedException("Property '%s' not recognized" % name)
class IncrementalParser(XMLReader):
"""This interface adds three extra methods to the XMLReader
interface that allow XML parsers to support incremental
parsing. Support for this interface is optional, since not all
underlying XML parsers support this functionality.
When the parser is instantiated it is ready to begin accepting
data from the feed method immediately. After parsing has been
finished with a call to close the reset method must be called to
make the parser ready to accept new data, either from feed or
using the parse method.
Note that these methods must _not_ be called during parsing, that
is, after parse has been called and before it returns.
By default, the class also implements the parse method of the XMLReader
interface using the feed, close and reset methods of the
IncrementalParser interface as a convenience to SAX 2.0 driver
writers."""
def __init__(self, bufsize=2**16 ):
self._bufsize=bufsize
XMLReader.__init__( self )
def parse(self, source):
self.prepareParser(source)
#FIXME: do some type checking: could be already stream, URL or
# filename
inf=open( source )
buffer = inf.read(self._bufsize)
while buffer != "":
self.feed(buffer)
buffer = inf.read(self._bufsize)
self.close()
self.reset()
def feed(self, data):
"""This method gives the raw XML data in the data parameter to
the parser and makes it parse the data, emitting the
corresponding events. It is allowed for XML constructs to be
split across several calls to feed.
feed may raise SAXException."""
raise NotImplementedError("This method must be implemented!")
def prepareParser(self, source):
"""This method is called by the parse implementation to allow
the SAX 2.0 driver to prepare itself for parsing."""
raise NotImplementedError("prepareParser must be overridden!")
def close(self):
"""This method is called when the entire XML document has been
passed to the parser through the feed method, to notify the
parser that there are no more data. This allows the parser to
do the final checks on the document and empty the internal
data buffer.
The parser will not be ready to parse another document until
the reset method has been called.
close may raise SAXException."""
raise NotImplementedError("This method must be implemented!")
def reset(self):
"""This method is called after close has been called to reset
the parser so that it is ready to parse new documents. The
results of calling parse or feed after close without calling
reset are undefined."""
raise NotImplementedError("This method must be implemented!")
# ===== LOCATOR =====
class Locator:
"""Interface for associating a SAX event with a document
location. A locator object will return valid results only during
calls to DocumentHandler methods; at any other time, the
results are unpredictable."""
def getColumnNumber(self):
"Return the column number where the current event ends."
return -1
def getLineNumber(self):
"Return the line number where the current event ends."
return -1
def getPublicId(self):
"Return the public identifier for the current event."
return None
def getSystemId(self):
"Return the system identifier for the current event."
return None
# --- AttributesImpl
class AttributesImpl:
def __init__(self, attrs, rawnames):
self._attrs = attrs
self._rawnames = rawnames
def getLength(self):
return len(self._attrs)
def getType(self, name):
return "CDATA"
def getValue(self, name):
return self._attrs[name]
def getValueByQName(self, name):
return self._attrs[self._rawnames[name]]
def getNameByQName(self, name):
return self._rawnames[name]
def getNames(self):
return self._attrs.keys()
def getQNames(self):
return self._rawnames.keys()
def __len__(self):
return len(self._attrs)
def __getitem__(self, name):
return self._attrs[name]
def keys(self):
return self._attrs.keys()
def has_key(self, name):
return self._attrs.has_key(name)
def get(self, name, alternative=None):
return self._attrs.get(name, alternative)
def copy(self):
return self.__class__(self._attrs, self._rawnames)
def items(self):
return self._attrs.items()
def values(self):
return self._attrs.values()
def _test():
XMLReader()
IncrementalParser()
Locator()
AttributesImpl()
if __name__=="__main__":
_test()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment