Source code for asdf.yamlutil

# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-

import warnings
from collections import OrderedDict

import numpy as np

import yaml

from . import schema
from . import tagged
from . import treeutil
from . import util
from .constants import YAML_TAG_PREFIX
from .versioning import split_tag_version
from .exceptions import AsdfConversionWarning

__all__ = ['custom_tree_to_tagged_tree', 'tagged_tree_to_custom_tree']

if getattr(yaml, '__with_libyaml__', None):  # pragma: no cover
    _yaml_base_dumper = yaml.CSafeDumper
    _yaml_base_loader = yaml.CSafeLoader
else:  # pragma: no cover
    _yaml_base_dumper = yaml.SafeDumper
    _yaml_base_loader = yaml.SafeLoader


# ----------------------------------------------------------------------
# Custom loader/dumpers

class AsdfDumper(_yaml_base_dumper):
    A specialized YAML dumper that understands "tagged basic Python
    data types" as implemented in the `tagged` module.

    def __init__(self, *args, **kwargs):
        kwargs['default_flow_style'] = None
        super().__init__(*args, **kwargs)

    def represent_data(self, data):
        node = super(AsdfDumper, self).represent_data(data)

        tag_name = getattr(data, '_tag', None)
        if tag_name is not None:
            node.tag = tag_name

        return node

_flow_style_map = {
    'flow': True,
    'block': False

def represent_sequence(dumper, sequence):
    flow_style = _flow_style_map.get(sequence.flow_style, None)
    sequence =
    return super(AsdfDumper, dumper).represent_sequence(
        None, sequence, flow_style)

def represent_mapping(dumper, mapping):
    flow_style = _flow_style_map.get(mapping.flow_style, None)
    node = super(AsdfDumper, dumper).represent_mapping(
        None,, flow_style)

    if mapping.property_order:
        values = node.value
        new_mapping = {}
        for key, val in values:
            new_mapping[key.value] = (key, val)

        new_values = []
        for key in mapping.property_order:
            if key in mapping:

        property_order = set(mapping.property_order)
        for key, val in values:
            if key.value not in property_order:
                new_values.append((key, val))

        node.value = new_values

    return node

_style_map = {
    'inline': '"',
    'folded': '>',
    'literal': '|'

def represent_scalar(dumper, value):
    style = _style_map.get(, None)
    return super(AsdfDumper, dumper).represent_scalar(
        None,, style)

def represent_ordered_mapping(dumper, tag, data):
    # TODO: Again, adjust for preferred flow style, and other stylistic details
    # NOTE: For block style this uses the compact omap notation, but for flow style
    # it does not.

    # TODO: Need to see if I can figure out a mechanism so that classes that
    # use this representer can specify which values should use flow style
    values = []
    node = yaml.SequenceNode(tag, values,
    if dumper.alias_key is not None:
        dumper.represented_objects[dumper.alias_key] = node
    for key, value in data.items():
        key_item = dumper.represent_data(key)
        value_item = dumper.represent_data(value)
        node_item = yaml.MappingNode(YAML_OMAP_TAG,
                                     [(key_item, value_item)],
    return node

def represent_ordereddict(dumper, data):
    return represent_ordered_mapping(dumper, YAML_OMAP_TAG, data)

AsdfDumper.add_representer(tagged.TaggedList, represent_sequence)
AsdfDumper.add_representer(tagged.TaggedDict, represent_mapping)
AsdfDumper.add_representer(tagged.TaggedString, represent_scalar)
AsdfDumper.add_representer(OrderedDict, represent_ordereddict)

# ----------------------------------------------------------------------
# Handle numpy scalars

for scalar_type in util.iter_subclasses(np.floating):
    AsdfDumper.add_representer(scalar_type, AsdfDumper.represent_float)

for scalar_type in util.iter_subclasses(np.integer):
    AsdfDumper.add_representer(scalar_type, AsdfDumper.represent_int)

def represent_numpy_str(dumper, data):
    # The CSafeDumper implementation will raise an error if it
    # doesn't recognize data as a string.  The Python SafeDumper
    # has no problem with np.str_.
    return dumper.represent_str(str(data))

AsdfDumper.add_representer(np.str_, represent_numpy_str)
AsdfDumper.add_representer(np.bytes_, AsdfDumper.represent_binary)

class AsdfLoader(_yaml_base_loader):
    A specialized YAML loader that can construct "tagged basic Python
    data types" as implemented in the `tagged` module.

    def construct_undefined(self, node):
        if isinstance(node, yaml.MappingNode):
            return self._construct_tagged_mapping(node)
        elif isinstance(node, yaml.SequenceNode):
            return self._construct_tagged_sequence(node)
        elif isinstance(node, yaml.ScalarNode):
            return self._construct_tagged_scalar(node)
            return super().construct_undefined(node)

    def _construct_tagged_mapping(self, node):
        data = tagged.tag_object(node.tag, {})
        yield data

    def _construct_tagged_sequence(self, node):
        data = tagged.tag_object(node.tag, [])
        yield data

    def _construct_tagged_scalar(self, node):
        return tagged.tag_object(node.tag, self.construct_scalar(node))

    # Custom omap deserializer that builds an OrderedDict instead
    # of a list of tuples.  Code is mostly identical to pyyaml's SafeConstructor.
    def construct_yaml_omap(self, node):
        omap = OrderedDict()
        yield omap
        if not isinstance(node, yaml.SequenceNode):
            raise yaml.ConstructorError("while constructing an ordered map", node.start_mark,
                    "expected a sequence, but found %s" %, node.start_mark)
        for subnode in node.value:
            if not isinstance(subnode, yaml.MappingNode):
                raise yaml.ConstructorError("while constructing an ordered map", node.start_mark,
                        "expected a mapping of length 1, but found %s" %,
            if len(subnode.value) != 1:
                raise yaml.ConstructorError("while constructing an ordered map", node.start_mark,
                        "expected a single mapping item, but found %d items" % len(subnode.value),
            key_node, value_node = subnode.value[0]
            key = self.construct_object(key_node)
            value = self.construct_object(value_node)
            omap[key] = value

# pyyaml will invoke the constructor associated with None when a node's
# tag is not explicitly handled by another constructor.
AsdfLoader.add_constructor(None, AsdfLoader.construct_undefined)
AsdfLoader.add_constructor(YAML_TAG_PREFIX + "omap", AsdfLoader.construct_yaml_omap)

[docs]def custom_tree_to_tagged_tree(tree, ctx): """ Convert a tree, possibly containing custom data types that aren't directly representable in YAML, to a tree of basic data types, annotated with tags. """ def walker(node): tag = ctx.type_index.from_custom_type(type(node), ctx.version_string) if tag is not None: return tag.to_tree_tagged(node, ctx) return node return treeutil.walk_and_modify( tree, walker, ignore_implicit_conversion=ctx._ignore_implicit_conversion, # Walk the tree in preorder, so that extensions can return # container nodes with unserialized children. postorder=False, _context=ctx._tree_modification_context, )
[docs]def tagged_tree_to_custom_tree(tree, ctx, force_raw_types=False): """ Convert a tree containing only basic data types, annotated with tags, to a tree containing custom data types. """ def walker(node): if force_raw_types: return node tag = getattr(node, '_tag', None) if tag is None: return node tag_type = ctx.type_index.from_yaml_tag(ctx, tag) # This means the tag did not correspond to any type in our type index. if tag_type is None: if not ctx._ignore_unrecognized_tag: warnings.warn("{} is not recognized, converting to raw Python " "data structure".format(tag), AsdfConversionWarning) return node tag_name, tag_version = split_tag_version(tag) # This means that there is an explicit description of versions that are # compatible with the associated tag class implementation, but the # version we found does not fit that description. if tag_type.incompatible_version(tag_version): warnings.warn("Version {} of {} is not compatible with any " "existing tag implementations".format( tag_version, tag_name), AsdfConversionWarning) return node # If a tag class does not explicitly list compatible versions, then all # versions of the corresponding schema are assumed to be compatible. # Therefore we need to check to make sure whether the conversion is # actually successful, and just return a raw Python data type if it is # not. try: return tag_type.from_tree_tagged(node, ctx) except TypeError as err: warnings.warn("Failed to convert {} to custom type (detail: {}). " "Using raw Python data structure instead".format(tag, err), AsdfConversionWarning) return node return treeutil.walk_and_modify( tree, walker, ignore_implicit_conversion=ctx._ignore_implicit_conversion, # Walk the tree in postorder, so that extensions receive # container nodes with children already deserialized. postorder=True, _context=ctx._tree_modification_context, )
def load_tree(stream): """ Load YAML, returning a tree of objects. Parameters ---------- stream : readable file-like object Stream containing the raw YAML content. """ # The following call to yaml.load is safe because we're # using a loader that inherits from pyyaml's SafeLoader. return yaml.load(stream, Loader=AsdfLoader) # nosec def dump_tree(tree, fd, ctx): """ Dump a tree of objects, possibly containing custom types, to YAML. Parameters ---------- tree : object Tree of objects, possibly containing custom data types. fd : asdf.generic_io.GenericFile A file object to dump the serialized YAML to. ctx : Context The writing context. """ class AsdfDumperTmp(AsdfDumper): pass AsdfDumperTmp.ctx = ctx tags = None tree_type = ctx.type_index.from_custom_type(type(tree)) if tree_type is not None: tag_parts = tree_type.yaml_tag.split(':') last_part = tag_parts[-1] if '/' in last_part: last_part = last_part[0:last_part.index('/') + 1] else: last_part = '' yaml_tag = ':'.join(tag_parts[0:-1] + [last_part]) tags = {'!': yaml_tag} tree = custom_tree_to_tagged_tree(tree, ctx) schema.validate(tree, ctx) schema.remove_defaults(tree, ctx) yaml_version = tuple( int(x) for x in ctx.version_map['YAML_VERSION'].split('.')) yaml.dump_all( [tree], stream=fd, Dumper=AsdfDumperTmp, explicit_start=True, explicit_end=True, version=yaml_version, allow_unicode=True, encoding='utf-8', default_flow_style=None, tags=tags)