# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
This module handles the conversion of various VOTABLE datatypes
to/from TABLEDATA_ and BINARY_ formats.
"""
# STDLIB
import re
import sys
from struct import pack as _struct_pack
from struct import unpack as _struct_unpack
# THIRD-PARTY
import numpy as np
from numpy import ma
# ASTROPY
from astropy.utils.xml.writer import xml_escape_cdata
# LOCAL
from .exceptions import (
E01,
E02,
E03,
E04,
E05,
E06,
E24,
W01,
W30,
W31,
W39,
W46,
W47,
W49,
W51,
W55,
vo_raise,
vo_warn,
warn_or_raise,
)
__all__ = ["get_converter", "Converter", "table_column_to_votable_datatype"]
pedantic_array_splitter = re.compile(r" +")
array_splitter = re.compile(r"\s+|(?:\s*,\s*)")
"""
A regex to handle splitting values on either whitespace or commas.
SPEC: Usage of commas is not actually allowed by the spec, but many
files in the wild use them.
"""
_zero_int = b"\0\0\0\0"
_empty_bytes = b""
_zero_byte = b"\0"
struct_unpack = _struct_unpack
struct_pack = _struct_pack
if sys.byteorder == "little":
def _ensure_bigendian(x):
if x.dtype.byteorder != ">":
return x.byteswap()
return x
else:
def _ensure_bigendian(x):
if x.dtype.byteorder == "<":
return x.byteswap()
return x
def _make_masked_array(data, mask):
"""
Masked arrays of zero length that also have a mask of zero length
cause problems in Numpy (at least in 1.6.2). This function
creates a masked array from data and a mask, unless it is zero
length.
"""
# np.ma doesn't like setting mask to []
if len(data):
return ma.array(np.array(data), mask=np.array(mask, dtype="bool"))
else:
return ma.array(np.array(data))
def bitarray_to_bool(data, length):
"""
Converts a bit array (a string of bits in a bytes object) to a
boolean Numpy array.
Parameters
----------
data : bytes
The bit array. The most significant byte is read first.
length : int
The number of bits to read. The least significant bits in the
data bytes beyond length will be ignored.
Returns
-------
array : numpy bool array
"""
results = []
for byte in data:
for bit_no in range(7, -1, -1):
bit = byte & (1 << bit_no)
bit = bit != 0
results.append(bit)
if len(results) == length:
break
if len(results) == length:
break
return np.array(results, dtype="b1")
def bool_to_bitarray(value):
"""
Converts a numpy boolean array to a bit array (a string of bits in
a bytes object).
Parameters
----------
value : numpy bool array
Returns
-------
bit_array : bytes
The first value in the input array will be the most
significant bit in the result. The length will be `floor((N +
7) / 8)` where `N` is the length of `value`.
"""
value = value.flat
bit_no = 7
byte = 0
bytes = []
for v in value:
if v:
byte |= 1 << bit_no
if bit_no == 0:
bytes.append(byte)
bit_no = 7
byte = 0
else:
bit_no -= 1
if bit_no != 7:
bytes.append(byte)
return struct_pack(f"{len(bytes)}B", *bytes)
[docs]class Converter:
"""
The base class for all converters. Each subclass handles
converting a specific VOTABLE data type to/from the TABLEDATA_ and
BINARY_ on-disk representations.
Parameters
----------
field : `~astropy.io.votable.tree.Field`
object describing the datatype
config : dict
The parser configuration dictionary
pos : tuple
The position in the XML file where the FIELD object was
found. Used for error messages.
"""
def __init__(self, field, config=None, pos=None):
pass
@staticmethod
def _parse_length(read):
return struct_unpack(">I", read(4))[0]
@staticmethod
def _write_length(length):
return struct_pack(">I", int(length))
[docs] def supports_empty_values(self, config):
"""
Returns True when the field can be completely empty.
"""
return config.get("version_1_3_or_later")
[docs] def parse(self, value, config=None, pos=None):
"""
Convert the string *value* from the TABLEDATA_ format into an
object with the correct native in-memory datatype and mask flag.
Parameters
----------
value : str
value in TABLEDATA format
Returns
-------
native : tuple
A two-element tuple of: value, mask.
The value as a Numpy array or scalar, and *mask* is True
if the value is missing.
"""
raise NotImplementedError("This datatype must implement a 'parse' method.")
[docs] def parse_scalar(self, value, config=None, pos=None):
"""
Parse a single scalar of the underlying type of the converter.
For non-array converters, this is equivalent to parse. For
array converters, this is used to parse a single
element of the array.
Parameters
----------
value : str
value in TABLEDATA format
Returns
-------
native : (2,) tuple
(value, mask)
The value as a Numpy array or scalar, and *mask* is True
if the value is missing.
"""
return self.parse(value, config, pos)
[docs] def output(self, value, mask):
"""
Convert the object *value* (in the native in-memory datatype)
to a unicode string suitable for serializing in the TABLEDATA_
format.
Parameters
----------
value
The value, the native type corresponding to this converter
mask : bool
If `True`, will return the string representation of a
masked value.
Returns
-------
tabledata_repr : unicode
"""
raise NotImplementedError("This datatype must implement a 'output' method.")
[docs] def binparse(self, read):
"""
Reads some number of bytes from the BINARY_ format
representation by calling the function *read*, and returns the
native in-memory object representation for the datatype
handled by *self*.
Parameters
----------
read : function
A function that given a number of bytes, returns a byte
string.
Returns
-------
native : (2,) tuple
(value, mask). The value as a Numpy array or scalar, and *mask* is
True if the value is missing.
"""
raise NotImplementedError("This datatype must implement a 'binparse' method.")
[docs] def binoutput(self, value, mask):
"""
Convert the object *value* in the native in-memory datatype to
a string of bytes suitable for serialization in the BINARY_
format.
Parameters
----------
value
The value, the native type corresponding to this converter
mask : bool
If `True`, will return the string representation of a
masked value.
Returns
-------
bytes : bytes
The binary representation of the value, suitable for
serialization in the BINARY_ format.
"""
raise NotImplementedError("This datatype must implement a 'binoutput' method.")
class Char(Converter):
"""
Handles the char datatype. (7-bit unsigned characters)
Missing values are not handled for string or unicode types.
"""
default = _empty_bytes
def __init__(self, field, config=None, pos=None):
if config is None:
config = {}
Converter.__init__(self, field, config, pos)
self.field_name = field.name
if field.arraysize is None:
vo_warn(W47, (), config, pos)
field.arraysize = "1"
if field.arraysize == "*":
self.format = "O"
self.binparse = self._binparse_var
self.binoutput = self._binoutput_var
self.arraysize = "*"
else:
if field.arraysize.endswith("*"):
field.arraysize = field.arraysize[:-1]
try:
self.arraysize = int(field.arraysize)
except ValueError:
vo_raise(E01, (field.arraysize, "char", field.ID), config)
self.format = f"U{self.arraysize:d}"
self.binparse = self._binparse_fixed
self.binoutput = self._binoutput_fixed
self._struct_format = f">{self.arraysize:d}s"
def supports_empty_values(self, config):
return True
def parse(self, value, config=None, pos=None):
if self.arraysize != "*" and len(value) > self.arraysize:
vo_warn(W46, ("char", self.arraysize), config, pos)
# Warn about non-ascii characters if warnings are enabled.
try:
value.encode("ascii")
except UnicodeEncodeError:
vo_warn(W55, (self.field_name, value), config, pos)
return value, False
def output(self, value, mask):
if mask:
return ""
# The output methods for Char assume that value is either str or bytes.
# This method needs to return a str, but needs to warn if the str contains
# non-ASCII characters.
try:
if isinstance(value, str):
value.encode("ascii")
else:
# Check for non-ASCII chars in the bytes object.
value = value.decode("ascii")
except (ValueError, UnicodeEncodeError):
warn_or_raise(E24, UnicodeEncodeError, (value, self.field_name))
finally:
if isinstance(value, bytes):
# Convert the bytes to str regardless of non-ASCII chars.
value = value.decode("utf-8")
return xml_escape_cdata(value)
def _binparse_var(self, read):
length = self._parse_length(read)
return read(length).decode("ascii"), False
def _binparse_fixed(self, read):
s = struct_unpack(self._struct_format, read(self.arraysize))[0]
end = s.find(_zero_byte)
s = s.decode("ascii")
if end != -1:
return s[:end], False
return s, False
def _binoutput_var(self, value, mask):
if mask or value is None or value == "":
return _zero_int
if isinstance(value, str):
try:
value = value.encode("ascii")
except ValueError:
vo_raise(E24, (value, self.field_name))
return self._write_length(len(value)) + value
def _binoutput_fixed(self, value, mask):
if mask:
value = _empty_bytes
elif isinstance(value, str):
try:
value = value.encode("ascii")
except ValueError:
vo_raise(E24, (value, self.field_name))
return struct_pack(self._struct_format, value)
class UnicodeChar(Converter):
"""
Handles the unicodeChar data type. UTF-16-BE.
Missing values are not handled for string or unicode types.
"""
default = ""
def __init__(self, field, config=None, pos=None):
Converter.__init__(self, field, config, pos)
if field.arraysize is None:
vo_warn(W47, (), config, pos)
field.arraysize = "1"
if field.arraysize == "*":
self.format = "O"
self.binparse = self._binparse_var
self.binoutput = self._binoutput_var
self.arraysize = "*"
else:
try:
self.arraysize = int(field.arraysize)
except ValueError:
vo_raise(E01, (field.arraysize, "unicode", field.ID), config)
self.format = f"U{self.arraysize:d}"
self.binparse = self._binparse_fixed
self.binoutput = self._binoutput_fixed
self._struct_format = f">{self.arraysize*2:d}s"
def parse(self, value, config=None, pos=None):
if self.arraysize != "*" and len(value) > self.arraysize:
vo_warn(W46, ("unicodeChar", self.arraysize), config, pos)
return value, False
def output(self, value, mask):
if mask:
return ""
return xml_escape_cdata(str(value))
def _binparse_var(self, read):
length = self._parse_length(read)
return read(length * 2).decode("utf_16_be"), False
def _binparse_fixed(self, read):
s = struct_unpack(self._struct_format, read(self.arraysize * 2))[0]
s = s.decode("utf_16_be")
end = s.find("\0")
if end != -1:
return s[:end], False
return s, False
def _binoutput_var(self, value, mask):
if mask or value is None or value == "":
return _zero_int
encoded = value.encode("utf_16_be")
return self._write_length(len(encoded) / 2) + encoded
def _binoutput_fixed(self, value, mask):
if mask:
value = ""
return struct_pack(self._struct_format, value.encode("utf_16_be"))
class Array(Converter):
"""
Handles both fixed and variable-lengths arrays.
"""
def __init__(self, field, config=None, pos=None):
if config is None:
config = {}
Converter.__init__(self, field, config, pos)
if config.get("verify", "ignore") == "exception":
self._splitter = self._splitter_pedantic
else:
self._splitter = self._splitter_lax
def parse_scalar(self, value, config=None, pos=0):
return self._base.parse_scalar(value, config, pos)
@staticmethod
def _splitter_pedantic(value, config=None, pos=None):
return pedantic_array_splitter.split(value)
@staticmethod
def _splitter_lax(value, config=None, pos=None):
if "," in value:
vo_warn(W01, (), config, pos)
return array_splitter.split(value)
class VarArray(Array):
"""
Handles variable lengths arrays (i.e. where *arraysize* is '*').
"""
format = "O"
def __init__(self, field, base, arraysize, config=None, pos=None):
Array.__init__(self, field, config)
self._base = base
self.default = np.array([], dtype=self._base.format)
def output(self, value, mask):
output = self._base.output
result = [output(x, m) for x, m in np.broadcast(value, mask)]
return " ".join(result)
def binparse(self, read):
length = self._parse_length(read)
result = []
result_mask = []
binparse = self._base.binparse
for i in range(length):
val, mask = binparse(read)
result.append(val)
result_mask.append(mask)
return _make_masked_array(result, result_mask), False
def binoutput(self, value, mask):
if value is None or len(value) == 0:
return _zero_int
length = len(value)
result = [self._write_length(length)]
binoutput = self._base.binoutput
for x, m in zip(value, value.mask):
result.append(binoutput(x, m))
return _empty_bytes.join(result)
class ArrayVarArray(VarArray):
"""
Handles an array of variable-length arrays, i.e. where *arraysize*
ends in '*'.
"""
def parse(self, value, config=None, pos=None):
if value.strip() == "":
return ma.array([]), False
parts = self._splitter(value, config, pos)
items = self._base._items
parse_parts = self._base.parse_parts
if len(parts) % items != 0:
vo_raise(E02, (items, len(parts)), config, pos)
result = []
result_mask = []
for i in range(0, len(parts), items):
value, mask = parse_parts(parts[i : i + items], config, pos)
result.append(value)
result_mask.append(mask)
return _make_masked_array(result, result_mask), False
class ScalarVarArray(VarArray):
"""
Handles a variable-length array of numeric scalars.
"""
def parse(self, value, config=None, pos=None):
if value.strip() == "":
return ma.array([]), False
parts = self._splitter(value, config, pos)
parse = self._base.parse
result = []
result_mask = []
for x in parts:
value, mask = parse(x, config, pos)
result.append(value)
result_mask.append(mask)
return _make_masked_array(result, result_mask), False
class NumericArray(Array):
"""
Handles a fixed-length array of numeric scalars.
"""
vararray_type = ArrayVarArray
def __init__(self, field, base, arraysize, config=None, pos=None):
Array.__init__(self, field, config, pos)
self._base = base
self._arraysize = arraysize
self.format = f"{tuple(arraysize)}{base.format}"
self._items = 1
for dim in arraysize:
self._items *= dim
self._memsize = np.dtype(self.format).itemsize
self._bigendian_format = ">" + self.format
self.default = np.empty(arraysize, dtype=self._base.format)
self.default[...] = self._base.default
def parse(self, value, config=None, pos=None):
if config is None:
config = {}
elif config["version_1_3_or_later"] and value == "":
return np.zeros(self._arraysize, dtype=self._base.format), True
parts = self._splitter(value, config, pos)
if len(parts) != self._items:
warn_or_raise(E02, E02, (self._items, len(parts)), config, pos)
if config.get("verify", "ignore") == "exception":
return self.parse_parts(parts, config, pos)
else:
if len(parts) == self._items:
pass
elif len(parts) > self._items:
parts = parts[: self._items]
else:
parts = parts + ([self._base.default] * (self._items - len(parts)))
return self.parse_parts(parts, config, pos)
def parse_parts(self, parts, config=None, pos=None):
base_parse = self._base.parse
result = []
result_mask = []
for x in parts:
value, mask = base_parse(x, config, pos)
result.append(value)
result_mask.append(mask)
result = np.array(result, dtype=self._base.format).reshape(self._arraysize)
result_mask = np.array(result_mask, dtype="bool").reshape(self._arraysize)
return result, result_mask
def output(self, value, mask):
base_output = self._base.output
value = np.asarray(value)
mask = np.asarray(mask)
if mask.size <= 1:
func = np.broadcast
else: # When mask is already array but value is scalar, this prevents broadcast
func = zip
return " ".join(base_output(x, m) for x, m in func(value.flat, mask.flat))
def binparse(self, read):
result = np.frombuffer(read(self._memsize), dtype=self._bigendian_format)[0]
result_mask = self._base.is_null(result)
return result, result_mask
def binoutput(self, value, mask):
filtered = self._base.filter_array(value, mask)
filtered = _ensure_bigendian(filtered)
return filtered.tobytes()
class Numeric(Converter):
"""
The base class for all numeric data types.
"""
array_type = NumericArray
vararray_type = ScalarVarArray
null = None
def __init__(self, field, config=None, pos=None):
Converter.__init__(self, field, config, pos)
self._memsize = np.dtype(self.format).itemsize
self._bigendian_format = ">" + self.format
if field.values.null is not None:
self.null = np.asarray(field.values.null, dtype=self.format)
self.default = self.null
self.is_null = self._is_null
else:
self.is_null = np.isnan
def binparse(self, read):
result = np.frombuffer(read(self._memsize), dtype=self._bigendian_format)
return result[0], self.is_null(result[0])
def _is_null(self, value):
return value == self.null
class FloatingPoint(Numeric):
"""
The base class for floating-point datatypes.
"""
default = np.nan
def __init__(self, field, config=None, pos=None):
if config is None:
config = {}
Numeric.__init__(self, field, config, pos)
precision = field.precision
width = field.width
if precision is None:
format_parts = ["{!r:>"]
else:
format_parts = ["{:"]
if width is not None:
format_parts.append(str(width))
if precision is not None:
if precision.startswith("E"):
format_parts.append(f".{int(precision[1:]):d}g")
elif precision.startswith("F"):
format_parts.append(f".{int(precision[1:]):d}f")
else:
format_parts.append(f".{int(precision):d}f")
format_parts.append("}")
self._output_format = "".join(format_parts)
self.nan = np.array(np.nan, self.format)
if self.null is None:
self._null_output = "NaN"
self._null_binoutput = self.binoutput(self.nan, False)
self.filter_array = self._filter_nan
else:
self._null_output = self.output(np.asarray(self.null), False)
self._null_binoutput = self.binoutput(np.asarray(self.null), False)
self.filter_array = self._filter_null
if config.get("verify", "ignore") == "exception":
self.parse = self._parse_pedantic
else:
self.parse = self._parse_permissive
def supports_empty_values(self, config):
return True
def _parse_pedantic(self, value, config=None, pos=None):
if value.strip() == "":
return self.null, True
f = float(value)
return f, self.is_null(f)
def _parse_permissive(self, value, config=None, pos=None):
try:
f = float(value)
return f, self.is_null(f)
except ValueError:
# IRSA VOTables use the word 'null' to specify empty values,
# but this is not defined in the VOTable spec.
if value.strip() != "":
vo_warn(W30, value, config, pos)
return self.null, True
@property
def output_format(self):
return self._output_format
def output(self, value, mask):
if mask:
return self._null_output
if np.isfinite(value):
if not np.isscalar(value):
value = value.dtype.type(value)
result = self._output_format.format(value)
if result.startswith("array"):
raise RuntimeError()
if self._output_format[2] == "r" and result.endswith(".0"):
result = result[:-2]
return result
elif np.isnan(value):
return "NaN"
elif np.isposinf(value):
return "+InF"
elif np.isneginf(value):
return "-InF"
# Should never raise
vo_raise(f"Invalid floating point value '{value}'")
def binoutput(self, value, mask):
if mask:
return self._null_binoutput
value = _ensure_bigendian(value)
return value.tobytes()
def _filter_nan(self, value, mask):
return np.where(mask, np.nan, value)
def _filter_null(self, value, mask):
return np.where(mask, self.null, value)
class Double(FloatingPoint):
"""
Handles the double datatype. Double-precision IEEE
floating-point.
"""
format = "f8"
class Float(FloatingPoint):
"""
Handles the float datatype. Single-precision IEEE floating-point.
"""
format = "f4"
class Integer(Numeric):
"""
The base class for all the integral datatypes.
"""
default = 0
def __init__(self, field, config=None, pos=None):
Numeric.__init__(self, field, config, pos)
def parse(self, value, config=None, pos=None):
if config is None:
config = {}
mask = False
if isinstance(value, str):
value = value.lower()
if value == "":
if config["version_1_3_or_later"]:
mask = True
else:
warn_or_raise(W49, W49, (), config, pos)
if self.null is not None:
value = self.null
else:
value = self.default
elif value == "nan":
mask = True
if self.null is None:
warn_or_raise(W31, W31, (), config, pos)
value = self.default
else:
value = self.null
elif value.startswith("0x"):
value = int(value[2:], 16)
else:
value = int(value, 10)
else:
value = int(value)
if self.null is not None and value == self.null:
mask = True
if value < self.val_range[0]:
warn_or_raise(W51, W51, (value, self.bit_size), config, pos)
value = self.val_range[0]
elif value > self.val_range[1]:
warn_or_raise(W51, W51, (value, self.bit_size), config, pos)
value = self.val_range[1]
return value, mask
def output(self, value, mask):
if mask:
if self.null is None:
warn_or_raise(W31, W31)
return "NaN"
return str(self.null)
return str(value)
def binoutput(self, value, mask):
if mask:
if self.null is None:
vo_raise(W31)
else:
value = self.null
value = _ensure_bigendian(value)
return value.tobytes()
def filter_array(self, value, mask):
if np.any(mask):
if self.null is not None:
return np.where(mask, self.null, value)
else:
vo_raise(W31)
return value
class UnsignedByte(Integer):
"""
Handles the unsignedByte datatype. Unsigned 8-bit integer.
"""
format = "u1"
val_range = (0, 255)
bit_size = "8-bit unsigned"
class Short(Integer):
"""
Handles the short datatype. Signed 16-bit integer.
"""
format = "i2"
val_range = (-32768, 32767)
bit_size = "16-bit"
class Int(Integer):
"""
Handles the int datatype. Signed 32-bit integer.
"""
format = "i4"
val_range = (-2147483648, 2147483647)
bit_size = "32-bit"
class Long(Integer):
"""
Handles the long datatype. Signed 64-bit integer.
"""
format = "i8"
val_range = (-9223372036854775808, 9223372036854775807)
bit_size = "64-bit"
class ComplexArrayVarArray(VarArray):
"""
Handles an array of variable-length arrays of complex numbers.
"""
def parse(self, value, config=None, pos=None):
if value.strip() == "":
return ma.array([]), True
parts = self._splitter(value, config, pos)
items = self._base._items
parse_parts = self._base.parse_parts
if len(parts) % items != 0:
vo_raise(E02, (items, len(parts)), config, pos)
result = []
result_mask = []
for i in range(0, len(parts), items):
value, mask = parse_parts(parts[i : i + items], config, pos)
result.append(value)
result_mask.append(mask)
return _make_masked_array(result, result_mask), False
class ComplexVarArray(VarArray):
"""
Handles a variable-length array of complex numbers.
"""
def parse(self, value, config=None, pos=None):
if value.strip() == "":
return ma.array([]), True
parts = self._splitter(value, config, pos)
parse_parts = self._base.parse_parts
result = []
result_mask = []
for i in range(0, len(parts), 2):
value = [float(x) for x in parts[i : i + 2]]
value, mask = parse_parts(value, config, pos)
result.append(value)
result_mask.append(mask)
return (
_make_masked_array(np.array(result, dtype=self._base.format), result_mask),
False,
)
class ComplexArray(NumericArray):
"""
Handles a fixed-size array of complex numbers.
"""
vararray_type = ComplexArrayVarArray
def __init__(self, field, base, arraysize, config=None, pos=None):
NumericArray.__init__(self, field, base, arraysize, config, pos)
self._items *= 2
def parse(self, value, config=None, pos=None):
parts = self._splitter(value, config, pos)
if parts == [""]:
parts = []
return self.parse_parts(parts, config, pos)
def parse_parts(self, parts, config=None, pos=None):
if len(parts) != self._items:
vo_raise(E02, (self._items, len(parts)), config, pos)
base_parse = self._base.parse_parts
result = []
result_mask = []
for i in range(0, self._items, 2):
value = [float(x) for x in parts[i : i + 2]]
value, mask = base_parse(value, config, pos)
result.append(value)
result_mask.append(mask)
result = np.array(result, dtype=self._base.format).reshape(self._arraysize)
result_mask = np.array(result_mask, dtype="bool").reshape(self._arraysize)
return result, result_mask
class Complex(FloatingPoint, Array):
"""
The base class for complex numbers.
"""
array_type = ComplexArray
vararray_type = ComplexVarArray
default = np.nan
def __init__(self, field, config=None, pos=None):
FloatingPoint.__init__(self, field, config, pos)
Array.__init__(self, field, config, pos)
def parse(self, value, config=None, pos=None):
stripped = value.strip()
if stripped == "" or stripped.lower() == "nan":
return np.nan, True
splitter = self._splitter
parts = [float(x) for x in splitter(value, config, pos)]
if len(parts) != 2:
vo_raise(E03, (value,), config, pos)
return self.parse_parts(parts, config, pos)
_parse_permissive = parse
_parse_pedantic = parse
def parse_parts(self, parts, config=None, pos=None):
value = complex(*parts)
return value, self.is_null(value)
def output(self, value, mask):
if mask:
if self.null is None:
return "NaN"
else:
value = self.null
real = self._output_format.format(float(value.real))
imag = self._output_format.format(float(value.imag))
if self._output_format[2] == "r":
if real.endswith(".0"):
real = real[:-2]
if imag.endswith(".0"):
imag = imag[:-2]
return real + " " + imag
class FloatComplex(Complex):
"""
Handle floatComplex datatype. Pair of single-precision IEEE
floating-point numbers.
"""
format = "c8"
class DoubleComplex(Complex):
"""
Handle doubleComplex datatype. Pair of double-precision IEEE
floating-point numbers.
"""
format = "c16"
class BitArray(NumericArray):
"""
Handles an array of bits.
"""
vararray_type = ArrayVarArray
def __init__(self, field, base, arraysize, config=None, pos=None):
NumericArray.__init__(self, field, base, arraysize, config, pos)
self._bytes = ((self._items - 1) // 8) + 1
@staticmethod
def _splitter_pedantic(value, config=None, pos=None):
return list(re.sub(r"\s", "", value))
@staticmethod
def _splitter_lax(value, config=None, pos=None):
if "," in value:
vo_warn(W01, (), config, pos)
return list(re.sub(r"\s|,", "", value))
def output(self, value, mask):
if np.any(mask):
vo_warn(W39)
value = np.asarray(value)
mapping = {False: "0", True: "1"}
return "".join(mapping[x] for x in value.flat)
def binparse(self, read):
data = read(self._bytes)
result = bitarray_to_bool(data, self._items)
result = result.reshape(self._arraysize)
result_mask = np.zeros(self._arraysize, dtype="b1")
return result, result_mask
def binoutput(self, value, mask):
if np.any(mask):
vo_warn(W39)
return bool_to_bitarray(value)
class Bit(Converter):
"""
Handles the bit datatype.
"""
format = "b1"
array_type = BitArray
vararray_type = ScalarVarArray
default = False
binary_one = b"\x08"
binary_zero = b"\0"
def parse(self, value, config=None, pos=None):
if config is None:
config = {}
mapping = {"1": True, "0": False}
if value is False or value.strip() == "":
if not config["version_1_3_or_later"]:
warn_or_raise(W49, W49, (), config, pos)
return False, True
else:
try:
return mapping[value], False
except KeyError:
vo_raise(E04, (value,), config, pos)
def output(self, value, mask):
if mask:
vo_warn(W39)
if value:
return "1"
else:
return "0"
def binparse(self, read):
data = read(1)
return (ord(data) & 0x8) != 0, False
def binoutput(self, value, mask):
if mask:
vo_warn(W39)
if value:
return self.binary_one
return self.binary_zero
class BooleanArray(NumericArray):
"""
Handles an array of boolean values.
"""
vararray_type = ArrayVarArray
def binparse(self, read):
data = read(self._items)
binparse = self._base.binparse_value
result = []
result_mask = []
for char in data:
value, mask = binparse(char)
result.append(value)
result_mask.append(mask)
result = np.array(result, dtype="b1").reshape(self._arraysize)
result_mask = np.array(result_mask, dtype="b1").reshape(self._arraysize)
return result, result_mask
def binoutput(self, value, mask):
binoutput = self._base.binoutput
value = np.asarray(value)
mask = np.asarray(mask)
result = [binoutput(x, m) for x, m in np.broadcast(value.flat, mask.flat)]
return _empty_bytes.join(result)
class Boolean(Converter):
"""
Handles the boolean datatype.
"""
format = "b1"
array_type = BooleanArray
vararray_type = ScalarVarArray
default = False
binary_question_mark = b"?"
binary_true = b"T"
binary_false = b"F"
def parse(self, value, config=None, pos=None):
if value == "":
return False, True
if value is False:
return False, True
mapping = {
"TRUE": (True, False),
"FALSE": (False, False),
"1": (True, False),
"0": (False, False),
"T": (True, False),
"F": (False, False),
"\0": (False, True),
" ": (False, True),
"?": (False, True),
"": (False, True),
}
try:
return mapping[value.upper()]
except KeyError:
vo_raise(E05, (value,), config, pos)
def output(self, value, mask):
if mask:
return "?"
if value:
return "T"
return "F"
def binparse(self, read):
value = ord(read(1))
return self.binparse_value(value)
_binparse_mapping = {
ord("T"): (True, False),
ord("t"): (True, False),
ord("1"): (True, False),
ord("F"): (False, False),
ord("f"): (False, False),
ord("0"): (False, False),
ord("\0"): (False, True),
ord(" "): (False, True),
ord("?"): (False, True),
}
def binparse_value(self, value):
try:
return self._binparse_mapping[value]
except KeyError:
vo_raise(E05, (value,))
def binoutput(self, value, mask):
if mask:
return self.binary_question_mark
if value:
return self.binary_true
return self.binary_false
converter_mapping = {
"double": Double,
"float": Float,
"bit": Bit,
"boolean": Boolean,
"unsignedByte": UnsignedByte,
"short": Short,
"int": Int,
"long": Long,
"floatComplex": FloatComplex,
"doubleComplex": DoubleComplex,
"char": Char,
"unicodeChar": UnicodeChar,
}
[docs]def get_converter(field, config=None, pos=None):
"""
Get an appropriate converter instance for a given field.
Parameters
----------
field : astropy.io.votable.tree.Field
config : dict, optional
Parser configuration dictionary
pos : tuple
Position in the input XML file. Used for error messages.
Returns
-------
converter : astropy.io.votable.converters.Converter
"""
if config is None:
config = {}
if field.datatype not in converter_mapping:
vo_raise(E06, (field.datatype, field.ID), config)
cls = converter_mapping[field.datatype]
converter = cls(field, config, pos)
arraysize = field.arraysize
# With numeric datatypes, special things need to happen for
# arrays.
if field.datatype not in ("char", "unicodeChar") and arraysize is not None:
if arraysize[-1] == "*":
arraysize = arraysize[:-1]
last_x = arraysize.rfind("x")
if last_x == -1:
arraysize = ""
else:
arraysize = arraysize[:last_x]
fixed = False
else:
fixed = True
if arraysize != "":
arraysize = [int(x) for x in arraysize.split("x")]
arraysize.reverse()
else:
arraysize = []
if arraysize != []:
converter = converter.array_type(field, converter, arraysize, config)
if not fixed:
converter = converter.vararray_type(field, converter, arraysize, config)
return converter
numpy_dtype_to_field_mapping = {
np.float64().dtype.num: "double",
np.float32().dtype.num: "float",
np.bool_().dtype.num: "bit",
np.uint8().dtype.num: "unsignedByte",
np.int16().dtype.num: "short",
np.int32().dtype.num: "int",
np.int64().dtype.num: "long",
np.complex64().dtype.num: "floatComplex",
np.complex128().dtype.num: "doubleComplex",
np.unicode_().dtype.num: "unicodeChar",
}
numpy_dtype_to_field_mapping[np.bytes_().dtype.num] = "char"
def _all_matching_dtype(column):
first_dtype = False
first_shape = ()
for x in column:
if not isinstance(x, np.ndarray) or len(x) == 0:
continue
if first_dtype is False:
first_dtype = x.dtype
first_shape = x.shape[1:]
elif first_dtype != x.dtype:
return False, ()
elif first_shape != x.shape[1:]:
first_shape = ()
return first_dtype, first_shape
def numpy_to_votable_dtype(dtype, shape):
"""
Converts a numpy dtype and shape to a dictionary of attributes for
a VOTable FIELD element and correspond to that type.
Parameters
----------
dtype : Numpy dtype instance
shape : tuple
Returns
-------
attributes : dict
A dict containing 'datatype' and 'arraysize' keys that can be
set on a VOTable FIELD element.
"""
if dtype.num not in numpy_dtype_to_field_mapping:
raise TypeError(f"{dtype!r} can not be represented in VOTable")
if dtype.char == "S":
return {"datatype": "char", "arraysize": str(dtype.itemsize)}
elif dtype.char == "U":
return {"datatype": "unicodeChar", "arraysize": str(dtype.itemsize // 4)}
else:
result = {"datatype": numpy_dtype_to_field_mapping[dtype.num]}
if len(shape):
result["arraysize"] = "x".join(str(x) for x in shape)
return result
[docs]def table_column_to_votable_datatype(column):
"""
Given a `astropy.table.Column` instance, returns the attributes
necessary to create a VOTable FIELD element that corresponds to
the type of the column.
This necessarily must perform some heuristics to determine the
type of variable length arrays fields, since they are not directly
supported by Numpy.
If the column has dtype of "object", it performs the following
tests:
- If all elements are byte or unicode strings, it creates a
variable-length byte or unicode field, respectively.
- If all elements are numpy arrays of the same dtype and with a
consistent shape in all but the first dimension, it creates a
variable length array of fixed sized arrays. If the dtypes
match, but the shapes do not, a variable length array is
created.
If the dtype of the input is not understood, it sets the data type
to the most inclusive: a variable length unicodeChar array.
Parameters
----------
column : `astropy.table.Column` instance
Returns
-------
attributes : dict
A dict containing 'datatype' and 'arraysize' keys that can be
set on a VOTable FIELD element.
"""
votable_string_dtype = None
if column.info.meta is not None:
votable_string_dtype = column.info.meta.get("_votable_string_dtype")
if column.dtype.char == "O":
if votable_string_dtype is not None:
return {"datatype": votable_string_dtype, "arraysize": "*"}
elif isinstance(column[0], np.ndarray):
dtype, shape = _all_matching_dtype(column)
if dtype is not False:
result = numpy_to_votable_dtype(dtype, shape)
if "arraysize" not in result:
result["arraysize"] = "*"
else:
result["arraysize"] += "*"
return result
# All bets are off, do the most generic thing
return {"datatype": "unicodeChar", "arraysize": "*"}
# For fixed size string columns, datatype here will be unicodeChar,
# but honor the original FIELD datatype if present.
result = numpy_to_votable_dtype(column.dtype, column.shape[1:])
if result["datatype"] == "unicodeChar" and votable_string_dtype == "char":
result["datatype"] = "char"
return result