# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Various utilities and cookbook-like things.
"""
# STDLIB
import codecs
import contextlib
import gzip
import io
import os
import re
from packaging.version import Version
__all__ = [
"convert_to_writable_filelike",
"stc_reference_frames",
"coerce_range_list_param",
]
[docs]@contextlib.contextmanager
def convert_to_writable_filelike(fd, compressed=False):
"""
Returns a writable file-like object suitable for streaming output.
Parameters
----------
fd : str or file-like
May be:
- a file path string, in which case it is opened, and the file
object is returned.
- an object with a :meth:``write`` method, in which case that
object is returned.
compressed : bool, optional
If `True`, create a gzip-compressed file. (Default is `False`).
Returns
-------
fd : writable file-like
"""
if isinstance(fd, str):
fd = os.path.expanduser(fd)
if fd.endswith(".gz") or compressed:
with gzip.GzipFile(fd, "wb") as real_fd:
encoded_fd = io.TextIOWrapper(real_fd, encoding="utf8")
yield encoded_fd
encoded_fd.flush()
real_fd.flush()
return
else:
with open(fd, "w", encoding="utf8") as real_fd:
yield real_fd
return
elif hasattr(fd, "write"):
assert callable(fd.write)
if compressed:
fd = gzip.GzipFile(fileobj=fd)
# If we can't write Unicode strings, use a codecs.StreamWriter
# object
needs_wrapper = False
try:
fd.write("")
except TypeError:
needs_wrapper = True
if not hasattr(fd, "encoding") or fd.encoding is None:
needs_wrapper = True
if needs_wrapper:
yield codecs.getwriter("utf-8")(fd)
fd.flush()
else:
yield fd
fd.flush()
return
else:
raise TypeError("Can not be coerced to writable file-like object")
# <http://www.ivoa.net/documents/REC/DM/STC-20071030.html>
stc_reference_frames = {
"FK4",
"FK5",
"ECLIPTIC",
"ICRS",
"GALACTIC",
"GALACTIC_I",
"GALACTIC_II",
"SUPER_GALACTIC",
"AZ_EL",
"BODY",
"GEO_C",
"GEO_D",
"MAG",
"GSE",
"GSM",
"SM",
"HGC",
"HGS",
"HEEQ",
"HRTN",
"HPC",
"HPR",
"HCC",
"HGI",
"MERCURY_C",
"VENUS_C",
"LUNA_C",
"MARS_C",
"JUPITER_C_III",
"SATURN_C_III",
"URANUS_C_III",
"NEPTUNE_C_III",
"PLUTO_C",
"MERCURY_G",
"VENUS_G",
"LUNA_G",
"MARS_G",
"JUPITER_G_III",
"SATURN_G_III",
"URANUS_G_III",
"NEPTUNE_G_III",
"PLUTO_G",
"UNKNOWNFrame",
}
[docs]def coerce_range_list_param(p, frames=None, numeric=True):
"""
Coerces and/or verifies the object *p* into a valid range-list-format parameter.
As defined in `Section 8.7.2 of Simple
Spectral Access Protocol
<http://www.ivoa.net/documents/REC/DAL/SSA-20080201.html>`_.
Parameters
----------
p : str or sequence
May be a string as passed verbatim to the service expecting a
range-list, or a sequence. If a sequence, each item must be
either:
- a numeric value
- a named value, such as, for example, 'J' for named
spectrum (if the *numeric* kwarg is False)
- a 2-tuple indicating a range
- the last item my be a string indicating the frame of
reference
frames : sequence of str, optional
A sequence of acceptable frame of reference keywords. If not
provided, the default set in ``set_reference_frames`` will be
used.
numeric : bool, optional
TODO
Returns
-------
parts : tuple
The result is a tuple:
- a string suitable for passing to a service as a range-list
argument
- an integer counting the number of elements
"""
def str_or_none(x):
if x is None:
return ""
if numeric:
x = float(x)
return str(x)
def numeric_or_range(x):
if isinstance(x, tuple) and len(x) == 2:
return f"{str_or_none(x[0])}/{str_or_none(x[1])}"
else:
return str_or_none(x)
def is_frame_of_reference(x):
return isinstance(x, str)
if p is None:
return None, 0
elif isinstance(p, (tuple, list)):
has_frame_of_reference = len(p) > 1 and is_frame_of_reference(p[-1])
if has_frame_of_reference:
points = p[:-1]
else:
points = p[:]
out = ",".join([numeric_or_range(x) for x in points])
length = len(points)
if has_frame_of_reference:
if frames is not None and p[-1] not in frames:
raise ValueError(f"'{p[-1]}' is not a valid frame of reference")
out += ";" + p[-1]
length += 1
return out, length
elif isinstance(p, str):
number = r"([-+]?[0-9]*\.?[0-9]+([eE][-+]?[0-9]+)?)?"
if not numeric:
number = r"(" + number + ")|([A-Z_]+)"
match = re.match(
"^" + number + r"([,/]" + number + r")+(;(?P<frame>[<A-Za-z_0-9]+))?$", p
)
if match is None:
raise ValueError(f"'{p}' is not a valid range list")
frame = match.groupdict()["frame"]
if frames is not None and frame is not None and frame not in frames:
raise ValueError(f"{frame!r} is not a valid frame of reference")
return p, p.count(",") + p.count(";") + 1
try:
float(p)
return str(p), 1
except TypeError:
raise ValueError(f"'{p}' is not a valid range list")
def version_compare(a, b):
"""
Compare two VOTable version identifiers.
"""
def version_to_tuple(v):
if v[0].lower() == "v":
v = v[1:]
return Version(v)
av = version_to_tuple(a)
bv = version_to_tuple(b)
# Can't use cmp because it was removed from Python 3.x
return (av > bv) - (av < bv)