Source code for astropy.utils.xml.iterparser

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
This module includes a fast iterator-based XML parser.
"""

# STDLIB
import contextlib
import io
import sys

# ASTROPY
from astropy.utils import data

__all__ = ["get_xml_iterator", "get_xml_encoding", "xml_readlines"]


@contextlib.contextmanager
def _convert_to_fd_or_read_function(fd):
    """
    Returns a function suitable for streaming input, or a file object.

    This function is only useful if passing off to C code where:

       - If it's a real file object, we want to use it as a real
         C file object to avoid the Python overhead.

       - If it's not a real file object, it's much handier to just
         have a Python function to call.

    This is somewhat quirky behavior, of course, which is why it is
    private.  For a more useful version of similar behavior, see
    `astropy.utils.misc.get_readable_fileobj`.

    Parameters
    ----------
    fd : object
        May be:

            - a file object.  If the file is uncompressed, this raw
              file object is returned verbatim.  Otherwise, the read
              method is returned.

            - a function that reads from a stream, in which case it is
              returned verbatim.

            - a file path, in which case it is opened.  Again, like a
              file object, if it's uncompressed, a raw file object is
              returned, otherwise its read method.

            - an object with a :meth:`read` method, in which case that
              method is returned.

    Returns
    -------
    fd : context-dependent
        See above.
    """
    if callable(fd):
        yield fd
        return

    with data.get_readable_fileobj(fd, encoding="binary") as new_fd:
        if sys.platform.startswith("win"):
            yield new_fd.read
        else:
            if isinstance(new_fd, io.FileIO):
                yield new_fd
            else:
                yield new_fd.read


def _fast_iterparse(fd, buffersize=2**10):
    from xml.parsers import expat

    if not callable(fd):
        read = fd.read
    else:
        read = fd

    queue = []
    text = []

    def start(name, attr):
        queue.append(
            (True, name, attr, (parser.CurrentLineNumber, parser.CurrentColumnNumber))
        )
        del text[:]

    def end(name):
        queue.append(
            (
                False,
                name,
                "".join(text).strip(),
                (parser.CurrentLineNumber, parser.CurrentColumnNumber),
            )
        )

    parser = expat.ParserCreate()
    parser.specified_attributes = True
    parser.StartElementHandler = start
    parser.EndElementHandler = end
    parser.CharacterDataHandler = text.append
    Parse = parser.Parse

    data = read(buffersize)
    while data:
        Parse(data, False)
        yield from queue
        del queue[:]
        data = read(buffersize)

    Parse("", True)
    yield from queue


# Try to import the C version of the iterparser, otherwise fall back
# to the Python implementation above.
_slow_iterparse = _fast_iterparse
try:
    from . import _iterparser

    _fast_iterparse = _iterparser.IterParser
except ImportError:
    pass


[docs]@contextlib.contextmanager def get_xml_iterator(source, _debug_python_based_parser=False): """ Returns an iterator over the elements of an XML file. The iterator doesn't ever build a tree, so it is much more memory and time efficient than the alternative in ``cElementTree``. Parameters ---------- source : path-like, readable file-like, or callable Handle that contains the data or function that reads it. If a function or callable object, it must directly read from a stream. Non-callable objects must define a ``read`` method. Returns ------- parts : iterator The iterator returns 4-tuples (*start*, *tag*, *data*, *pos*): - *start*: when `True` is a start element event, otherwise an end element event. - *tag*: The name of the element - *data*: Depends on the value of *event*: - if *start* == `True`, data is a dictionary of attributes - if *start* == `False`, data is a string containing the text content of the element - *pos*: Tuple (*line*, *col*) indicating the source of the event. """ with _convert_to_fd_or_read_function(source) as fd: if _debug_python_based_parser: context = _slow_iterparse(fd) else: context = _fast_iterparse(fd) yield iter(context)
[docs]def get_xml_encoding(source): """ Determine the encoding of an XML file by reading its header. Parameters ---------- source : path-like, readable file-like, or callable Handle that contains the data or function that reads it. If a function or callable object, it must directly read from a stream. Non-callable objects must define a ``read`` method. Returns ------- encoding : str """ with get_xml_iterator(source) as iterator: start, tag, data, pos = next(iterator) if not start or tag != "xml": raise OSError("Invalid XML file") # The XML spec says that no encoding === utf-8 return data.get("encoding") or "utf-8"
[docs]def xml_readlines(source): """ Get the lines from a given XML file. Correctly determines the encoding and always returns unicode. Parameters ---------- source : path-like, readable file-like, or callable Handle that contains the data or function that reads it. If a function or callable object, it must directly read from a stream. Non-callable objects must define a ``read`` method. Returns ------- lines : list of unicode """ encoding = get_xml_encoding(source) with data.get_readable_fileobj(source, encoding=encoding) as input: input.seek(0) xml_lines = input.readlines() return xml_lines