Source code for guiqwt.io

# -*- coding: utf-8 -*-
#
# Copyright © 2009-2012 CEA
# Pierre Raybaut
# Licensed under the terms of the CECILL License
# (see guiqwt/__init__.py for details)

# pylint: disable=C0103

"""
guiqwt.io
---------

The `io` module provides input/output helper functions:
    * :py:func:`guiqwt.io.imread`: load an image (.png, .tiff,
      .dicom, etc.) and return its data as a NumPy array
    * :py:func:`guiqwt.io.imwrite`: save an array to an image file
    * :py:func:`guiqwt.io.load_items`: load plot items from HDF5
    * :py:func:`guiqwt.io.save_items`: save plot items to HDF5

Reference
~~~~~~~~~

.. autofunction:: imread
.. autofunction:: imwrite
.. autofunction:: load_items
.. autofunction:: save_items
"""


import sys
import re
import os.path as osp
import numpy as np

# Local imports
from guiqwt.config import _


def scale_data_to_dtype(data, dtype):
    """Scale array `data` to fit datatype `dtype` dynamic range

    WARNING: modifies data in place"""
    info = np.iinfo(dtype)
    dmin = data.min()
    dmax = data.max()
    data -= dmin
    data *= float(info.max - info.min) / (dmax - dmin)
    data += float(info.min)
    return np.array(data, dtype)


def eliminate_outliers(data, percent=2.0, bins=256):
    """Eliminate data histogram outliers"""
    hist, bin_edges = np.histogram(data, bins)
    from guiqwt.histogram import hist_range_threshold

    vmin, vmax = hist_range_threshold(hist, bin_edges, percent)
    return data.clip(vmin, vmax)


# ===============================================================================
# I/O File type definitions
# ===============================================================================
class FileType(object):
    """Filetype object:
    * `name` : description of filetype,
    * `read_func`, `write_func` : I/O callbacks,
    * `extensions`: filename extensions (with a dot!) or filenames,
    (list, tuple or space-separated string)
    * `data_types`: supported data types"""

    def __init__(
        self,
        name,
        extensions,
        read_func=None,
        write_func=None,
        data_types=None,
        requires_template=False,
    ):
        self.name = name
        if isinstance(extensions, str):
            extensions = extensions.split()
        self.extensions = [osp.splitext(" " + ext)[1] for ext in extensions]
        self.read_func = read_func
        self.write_func = write_func
        self.data_types = data_types
        self.requires_template = requires_template

    def matches(self, action, dtype, template):
        """Return True if file type matches passed data type and template
        (or if dtype is None)"""
        assert action in ("load", "save")
        matches = dtype is None or self.data_types is None or dtype in self.data_types
        if action == "save" and self.requires_template:
            matches = matches and template is not None
        return matches

    @property
    def wcards(self):
        return "*" + (" *".join(self.extensions))

    def filters(self, action, dtype, template):
        assert action in ("load", "save")
        if self.matches(action, dtype, template):
            return "\n%s (%s)" % (self.name, self.wcards)
        else:
            return ""


class ImageIOHandler(object):
    """I/O handler: regroup all FileType objects"""

    def __init__(self):
        self.filetypes = []

    def allfilters(self, action, dtype, template):
        wcards = " ".join(
            [
                ftype.wcards
                for ftype in self.filetypes
                if ftype.matches(action, dtype, template)
            ]
        )
        return "%s (%s)" % (_("All supported files"), wcards)

    def get_filters(self, action, dtype=None, template=None):
        """Return file type filters for `action` (string: 'save' or 'load'),
        `dtype` data type (None: all data types), and `template` (True if save
        function requires a template (e.g. DICOM files), False otherwise)"""
        filters = self.allfilters(action, dtype, template)
        for ftype in self.filetypes:
            filters += ftype.filters(action, dtype, template)
        return filters

    def add(
        self,
        name,
        extensions,
        read_func=None,
        write_func=None,
        import_func=None,
        data_types=None,
        requires_template=None,
    ):
        if import_func is not None:
            try:
                import_func()
            except ImportError:
                return
        assert read_func is not None or write_func is not None
        ftype = FileType(
            name,
            extensions,
            read_func=read_func,
            write_func=write_func,
            data_types=data_types,
            requires_template=requires_template,
        )
        self.filetypes.append(ftype)

    def _get_filetype(self, ext):
        """Return FileType object associated to file extension `ext`"""
        for ftype in self.filetypes:
            if ext.lower() in ftype.extensions:
                return ftype
        else:
            raise RuntimeError("Unsupported file type: '%s'" % ext)

    def get_readfunc(self, ext):
        """Return read function associated to file extension `ext`"""
        ftype = self._get_filetype(ext)
        if ftype.read_func is None:
            raise RuntimeError("Unsupported file type (read): '%s'" % ext)
        else:
            return ftype.read_func

    def get_writefunc(self, ext):
        """Return read function associated to file extension `ext`"""
        ftype = self._get_filetype(ext)
        if ftype.write_func is None:
            raise RuntimeError("Unsupported file type (write): '%s'" % ext)
        else:
            return ftype.write_func


iohandler = ImageIOHandler()


# ==============================================================================
# tifffile-based Private I/O functions
# ==============================================================================


def _imread_tiff(filename):
    """Open a TIFF image and return a NumPy array"""
    try:
        import tifffile

        return tifffile.imread(filename)
    except ImportError:
        return _imread_pil(filename)


def _imwrite_tiff(filename, arr):
    """Save a NumPy array to a TIFF file"""
    try:
        import tifffile

        return tifffile.imwrite(filename, arr)
    except ImportError:
        return _imwrite_pil(filename, arr)


# ==============================================================================
# PIL-based Private I/O functions
# ==============================================================================
if sys.byteorder == "little":
    _ENDIAN = "<"
else:
    _ENDIAN = ">"

DTYPES = {
    "1": ("|b1", None),
    "L": ("|u1", None),
    "I": ("%si4" % _ENDIAN, None),
    "F": ("%sf4" % _ENDIAN, None),
    "I;16": ("%su2" % _ENDIAN, None),
    "I;16B": ("%su2" % _ENDIAN, None),
    "I;16S": ("%si2" % _ENDIAN, None),
    "P": ("|u1", None),
    "RGB": ("|u1", 3),
    "RGBX": ("|u1", 4),
    "RGBA": ("|u1", 4),
    "CMYK": ("|u1", 4),
    "YCbCr": ("|u1", 4),
}


def _imread_pil(filename, to_grayscale=False):
    """Open image with PIL and return a NumPy array"""
    import PIL.Image
    import PIL.TiffImagePlugin  # py2exe

    PIL.TiffImagePlugin.OPEN_INFO[(PIL.TiffImagePlugin.II, 0, 1, 1, (16,), ())] = (
        "I;16",
        "I;16",
    )
    img = PIL.Image.open(filename)
    if img.mode in ("CMYK", "YCbCr"):
        # Converting to RGB
        img = img.convert("RGB")
    if to_grayscale and img.mode in ("RGB", "RGBA", "RGBX"):
        # Converting to grayscale
        img = img.convert("L")
    elif "A" in img.mode or (img.mode == "P" and "transparency" in img.info):
        img = img.convert("RGBA")
    elif img.mode == "P":
        img = img.convert("RGB")
    try:
        dtype, extra = DTYPES[img.mode]
    except KeyError:
        raise RuntimeError("%s mode is not supported" % img.mode)
    shape = (img.size[1], img.size[0])
    if extra is not None:
        shape += (extra,)
    try:
        return np.array(img, dtype=np.dtype(dtype)).reshape(shape)
    except SystemError:
        return np.array(img.getdata(), dtype=np.dtype(dtype)).reshape(shape)


def _imwrite_pil(filename, arr):
    """Write `arr` NumPy array to `filename` using PIL"""
    import PIL.Image
    import PIL.TiffImagePlugin  # py2exe

    for mode, (dtype_str, extra) in list(DTYPES.items()):
        if dtype_str == arr.dtype.str:
            if extra is None:  # mode for grayscale images
                if len(arr.shape[2:]) > 0:
                    continue  # not suitable for RGB(A) images
                else:
                    break  # this is it!
            else:  # mode for RGB(A) images
                if len(arr.shape[2:]) == 0:
                    continue  # not suitable for grayscale images
                elif arr.shape[-1] == extra:
                    break  # this is it!
    else:
        raise RuntimeError("Cannot determine PIL data type")
    img = PIL.Image.fromarray(arr, mode)
    img.save(filename)


# ==============================================================================
# DICOM Private I/O functions
# ==============================================================================
def _import_dcm():
    """DICOM Import function (checking for required libraries):
    DICOM support requires library `pydicom`"""
    import logging

    logger = logging.getLogger("pydicom")
    logger.setLevel(logging.CRITICAL)
    from pydicom import dicomio  # analysis:ignore

    logger.setLevel(logging.WARNING)


def _imread_dcm(filename):
    """Open DICOM image with pydicom and return a NumPy array"""
    from pydicom import dicomio

    dcm = dicomio.read_file(filename, force=True)
    # **********************************************************************
    # The following is necessary until pydicom numpy support is improved:
    # (after that, a simple: 'arr = dcm.PixelArray' will work the same)
    format_str = "%sint%s" % (("u", "")[dcm.PixelRepresentation], dcm.BitsAllocated)
    try:
        dtype = np.dtype(format_str)
    except TypeError:
        raise TypeError(
            "Data type not understood by NumPy: "
            "PixelRepresentation=%d, BitsAllocated=%d"
            % (dcm.PixelRepresentation, dcm.BitsAllocated)
        )
    arr = np.fromstring(dcm.PixelData, dtype)
    try:
        # pydicom 0.9.3:
        dcm_is_little_endian = dcm.isLittleEndian
    except AttributeError:
        # pydicom 0.9.4:
        dcm_is_little_endian = dcm.is_little_endian
    if dcm_is_little_endian != (sys.byteorder == "little"):
        arr.byteswap(True)
    spp = getattr(dcm, "SamplesperPixel", 1)
    if hasattr(dcm, "NumberofFrames") and dcm.NumberofFrames > 1:
        if spp > 1:
            arr = arr.reshape(spp, dcm.NumberofFrames, dcm.Rows, dcm.Columns)
        else:
            arr = arr.reshape(dcm.NumberofFrames, dcm.Rows, dcm.Columns)
    else:
        if spp > 1:
            if dcm.BitsAllocated == 8:
                arr = arr.reshape(spp, dcm.Rows, dcm.Columns)
            else:
                raise NotImplementedError(
                    "This code only handles "
                    "SamplesPerPixel > 1 if Bits Allocated = 8"
                )
        else:
            arr = arr.reshape(dcm.Rows, dcm.Columns)
    # **********************************************************************
    return arr


def _imwrite_dcm(filename, arr, template=None):
    """Save a numpy array `arr` into a DICOM image file `filename`
    based on DICOM structure `template`"""
    # Note: due to IOHandler formalism, `template` has to be a keyword argument
    assert template is not None, (
        "The `template` keyword argument is required to save DICOM files\n"
        "(that is the template DICOM structure object)"
    )
    infos = np.iinfo(arr.dtype)
    template.BitsAllocated = infos.bits
    template.BitsStored = infos.bits
    template.HighBit = infos.bits - 1
    template.PixelRepresentation = ("u", "i").index(infos.kind)
    data_vr = ("US", "SS")[template.PixelRepresentation]
    template.Rows = arr.shape[0]
    template.Columns = arr.shape[1]
    template.SmallestImagePixelValue = int(arr.min())
    template[0x00280106].VR = data_vr
    template.LargestImagePixelValue = int(arr.max())
    template[0x00280107].VR = data_vr
    if not template.PhotometricInterpretation.startswith("MONOCHROME"):
        template.PhotometricInterpretation = "MONOCHROME1"
    template.PixelData = arr.tostring()
    template[0x7FE00010].VR = "OB"
    template.save_as(filename)


# ==============================================================================
# Text files Private I/O functions
# ==============================================================================
def _imread_txt(filename):
    """Open text file image and return a NumPy array"""
    for delimiter in ("\t", ",", " ", ";"):
        try:
            return np.loadtxt(filename, delimiter=delimiter)
        except ValueError:
            continue
    else:
        raise


def _imwrite_txt(filename, arr):
    """Write `arr` NumPy array to text file `filename`"""
    if arr.dtype in (np.int8, np.uint8, np.int16, np.uint16, np.int32, np.uint32):
        fmt = "%d"
    else:
        fmt = "%.18e"
    ext = osp.splitext(filename)[1]
    if ext.lower() in (".txt", ".asc", ""):
        np.savetxt(filename, arr, fmt=fmt)
    elif ext.lower() == ".csv":
        np.savetxt(filename, arr, fmt=fmt, delimiter=",")


# ==============================================================================
# Registering I/O functions
# ==============================================================================
iohandler.add(
    _("PNG files"),
    "*.png",
    read_func=_imread_pil,
    write_func=_imwrite_pil,
    data_types=(np.uint8, np.uint16),
)
iohandler.add(
    _("TIFF files"), "*.tif *.tiff", read_func=_imread_tiff, write_func=_imwrite_tiff
)
iohandler.add(
    _("8-bit images"),
    "*.jpg *.gif",
    read_func=_imread_pil,
    write_func=_imwrite_pil,
    data_types=(np.uint8,),
)
iohandler.add(_("NumPy arrays"), "*.npy", read_func=np.load, write_func=np.save)
iohandler.add(
    _("Text files"), "*.txt *.csv *.asc", read_func=_imread_txt, write_func=_imwrite_txt
)
iohandler.add(
    _("DICOM files"),
    "*.dcm",
    read_func=_imread_dcm,
    write_func=_imwrite_dcm,
    import_func=_import_dcm,
    data_types=(np.int8, np.uint8, np.int16, np.uint16),
    requires_template=True,
)


# ==============================================================================
# Generic image read/write functions
# ==============================================================================
[docs]def imread(fname, ext=None, to_grayscale=False): """Return a NumPy array from an image filename `fname`. If `to_grayscale` is True, convert RGB images to grayscale The `ext` (optional) argument is a string that specifies the file extension which defines the input format: when not specified, the input format is guessed from filename.""" if ext is None: _base, ext = osp.splitext(fname) arr = iohandler.get_readfunc(ext)(fname) if to_grayscale and arr.ndim == 3: # Converting to grayscale return arr[..., :4].mean(axis=2) else: return arr
[docs]def imwrite(fname, arr, ext=None, dtype=None, max_range=None, **kwargs): """Save a NumPy array to an image filename `fname`. If `to_grayscale` is True, convert RGB images to grayscale The `ext` (optional) argument is a string that specifies the file extension which defines the input format: when not specified, the input format is guessed from filename. If `max_range` is True, array data is scaled to fit the `dtype` (or data type itself if `dtype` is None) dynamic range Warning: option `max_range` changes data in place""" if ext is None: _base, ext = osp.splitext(fname) if max_range: arr = scale_data_to_dtype(arr, arr.dtype if dtype is None else dtype) iohandler.get_writefunc(ext)(fname, arr, **kwargs)
# ============================================================================== # Deprecated functions # ============================================================================== def imagefile_to_array(filename, to_grayscale=False): """ Return a NumPy array from an image file `filename` If `to_grayscale` is True, convert RGB images to grayscale """ print("io.imagefile_to_array is deprecated: use io.imread instead", file=sys.stderr) return imread(filename, to_grayscale=to_grayscale) def array_to_imagefile(arr, filename, mode=None, max_range=False): """ Save a numpy array `arr` into an image file `filename` Warning: option 'max_range' changes data in place """ print( "io.array_to_imagefile is deprecated: use io.imwrite instead", file=sys.stderr ) return imwrite(filename, arr, mode=mode, max_range=max_range) # ============================================================================== # guiqwt plot items I/O # ============================================================================== SERIALIZABLE_ITEMS = [] ITEM_MODULES = {} def register_serializable_items(modname, classnames): """Register serializable item from module name and class name""" global SERIALIZABLE_ITEMS, ITEM_MODULES SERIALIZABLE_ITEMS += classnames ITEM_MODULES[modname] = ITEM_MODULES.setdefault(modname, []) + classnames # Curves register_serializable_items( "guiqwt.curve", ["CurveItem", "PolygonMapItem", "ErrorBarCurveItem"] ) # Images register_serializable_items( "guiqwt.image", [ "RawImageItem", "ImageItem", "TrImageItem", "XYImageItem", "RGBImageItem", "MaskedImageItem", ], ) # Shapes register_serializable_items( "guiqwt.shapes", [ "Marker", "PolygonShape", "PointShape", "SegmentShape", "RectangleShape", "ObliqueRectangleShape", "EllipseShape", "Axes", ], ) # Annotations register_serializable_items( "guiqwt.annotations", [ "AnnotatedPoint", "AnnotatedSegment", "AnnotatedRectangle", "AnnotatedObliqueRectangle", "AnnotatedEllipse", "AnnotatedCircle", ], ) # Labels register_serializable_items( "guiqwt.label", ["LabelItem", "LegendBoxItem", "SelectedLegendBoxItem"] ) def item_class_from_name(name): """Return plot item class from class name""" global SERIALIZABLE_ITEMS, ITEM_MODULES assert name in SERIALIZABLE_ITEMS, "Unknown class %r" % name for modname, names in list(ITEM_MODULES.items()): if name in names: return getattr(__import__(modname, fromlist=[name]), name) def item_name_from_object(obj): """Return plot item class name from instance""" return obj.__class__.__name__ def save_item(writer, group_name, item): """Save plot item to HDF5 group""" with writer.group(group_name): if item is None: writer.write_none() else: item.serialize(writer) with writer.group("item_class_name"): writer.write_str(item_name_from_object(item)) def load_item(reader, group_name): """Load plot item from HDF5 group""" with reader.group(group_name): with reader.group("item_class_name"): try: klass_name = reader.read_str() except ValueError: # None was saved instead of a real item return klass = item_class_from_name(klass_name) item = klass() item.deserialize(reader) return item
[docs]def save_items(writer, items): """Save items to HDF5 file: * writer: :py:class:`guidata.hdf5io.HDF5Writer` object * items: serializable plot items""" counts = {} names = [] def _get_name(item): basename = item_name_from_object(item) count = counts[basename] = counts.setdefault(basename, 0) + 1 name = "%s_%03d" % (basename, count) names.append(name.encode("utf-8")) return name for item in items: with writer.group(_get_name(item)): item.serialize(writer) with writer.group("plot_items"): writer.write_sequence(names)
[docs]def load_items(reader): """Load items from HDF5 file: * reader: :py:class:`guidata.hdf5io.HDF5Reader` object""" with reader.group("plot_items"): names = reader.read_sequence() items = [] for name in names: try: name_str = name.decode() except AttributeError: name_str = name klass_name = re.match(r"([A-Z]+[A-Za-z0-9\_]*)\_([0-9]*)", name_str).groups()[0] klass = item_class_from_name(klass_name) item = klass() with reader.group(name): item.deserialize(reader) items.append(item) return items
if __name__ == "__main__": # Test if items can all be constructed from their Python module for name in SERIALIZABLE_ITEMS: print(name, "-->", item_class_from_name(name))