# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""This module corresponds to the trace directory in idlutils.
Traceset (or "trace set"):
A set of coefficient vectors defining a set of functions
over a common independent-variable domain specified by ``xmin``
and ``xmax`` values. The functions in the set are defined in
terms of a linear combination of basis functions (such as
Legendre of Chebyshev polynonials) up to a specified maximum
order, weighted by the values in the coefficient vectors, and
evaluated using a suitable affine rescaling of the
1dependent-variable domain (such as ``[xmin, xmax] -> [-1, 1]``
for Legendre polynomials). For evaluation purposes, the
domain is assumed by default to be a zero-based integer
baseline from ``xmin`` to ``xmax`` such as would correspond to a
digital detector pixel grid.
Etymology:
From the original motivating use case of encoding the "traces"
of multiple spectra across the detector in a multi-fiber spectrograph.
Definition from A. Bolton, U. of Utah, June 2015.
"""
import numpy as np
from scipy.special import chebyt
from astropy.io.fits.fitsrec import FITS_rec
from . import PydlutilsException
from .math import djs_reject
from .misc import djs_laxisgen
from ..goddard.math import flegendre
[docs]def fchebyshev(x, m):
"""Compute the first `m` Chebyshev polynomials.
Parameters
----------
x : array-like
Compute the Chebyshev polynomials at these abscissa values.
m : :class:`int`
The number of Chebyshev polynomials to compute. For example, if
:math:`m = 3`, :math:`T_0 (x)`, :math:`T_1 (x)` and
:math:`T_2 (x)` will be computed.
Returns
-------
:class:`numpy.ndarray`
"""
if isinstance(x, np.ndarray):
n = x.size
else:
n = 1
if m < 1:
raise ValueError('Order of Chebyshev polynomial must be at least 1.')
try:
dt = x.dtype
except AttributeError:
dt = np.float64
leg = np.ones((m, n), dtype=dt)
if m >= 2:
leg[1, :] = x
if m >= 3:
for k in range(2, m):
leg[k, :] = np.polyval(chebyt(k), x)
return leg
[docs]def fchebyshev_split(x, m):
"""Compute the first `m` Chebyshev polynomials, but modified to allow a
split in the baseline at :math:`x=0`. The intent is to allow a model fit
where a constant term is different for positive and negative `x`.
Parameters
----------
x : array-like
Compute the Chebyshev polynomials at these abscissa values.
m : :class:`int`
The number of Chebyshev polynomials to compute. For example, if
:math:`m = 3`, :math:`T_0 (x)`, :math:`T_1 (x)` and
:math:`T_2 (x)` will be computed.
Returns
-------
:class:`numpy.ndarray`
"""
if isinstance(x, np.ndarray):
n = x.size
else:
n = 1
if m < 2:
raise ValueError('Order of polynomial must be at least 2.')
try:
dt = x.dtype
except AttributeError:
dt = np.float64
leg = np.ones((m, n), dtype=dt)
try:
leg[0, :] = (x >= 0).astype(x.dtype)
except AttributeError:
leg[0, :] = np.double(x >= 0)
if m > 2:
leg[2, :] = x
if m > 3:
for k in range(3, m):
leg[k, :] = 2.0 * x * leg[k-1, :] - leg[k-2, :]
return leg
[docs]def fpoly(x, m):
"""Compute the first `m` simple polynomials.
Parameters
----------
x : array-like
Compute the simple polynomials at these abscissa values.
m : :class:`int`
The number of simple polynomials to compute. For example, if
:math:`m = 3`, :math:`x^0`, :math:`x^1` and
:math:`x^2` will be computed.
Returns
-------
:class:`numpy.ndarray`
"""
if isinstance(x, np.ndarray):
n = x.size
else:
n = 1
if m < 1:
raise ValueError('Order of polynomial must be at least 1.')
try:
dt = x.dtype
except AttributeError:
dt = np.float64
leg = np.ones((m, n), dtype=dt)
if m >= 2:
leg[1, :] = x
if m >= 3:
for k in range(2, m):
leg[k, :] = leg[k-1, :] * x
return leg
[docs]def func_fit(x, y, ncoeff, invvar=None, function_name='legendre', ia=None,
inputans=None, inputfunc=None):
"""Fit `x`, `y` positions to a functional form.
Parameters
----------
x : array-like
X values (independent variable).
y : array-like
Y values (dependent variable).
ncoeff : :class:`int`
Number of coefficients to fit.
invvar : array-like, optional
Weight values; inverse variance.
function_name : :class:`str`, optional
Function name, default 'legendre'.
ia : array-like, optional
An array of bool of length `ncoeff` specifying free (``True``) and
fixed (``False``) parameters.
inputans : array-like, optional
An array of values of length `ncoeff` specifying the values of
the fixed parameters.
inputfunc : array-like, optional
Multiply the function fit by these values.
Returns
-------
:class:`tuple` of array-like
Fit coefficients, length `ncoeff`; fitted values.
Raises
------
:exc:`KeyError`
If an invalid function type is selected.
:exc:`ValueError`
If input dimensions do not agree.
"""
if x.shape != y.shape:
raise ValueError('Dimensions of X and Y do not agree!')
if invvar is None:
invvar = np.ones(x.shape, dtype=x.dtype)
else:
if invvar.shape != x.shape:
raise ValueError('Dimensions of X and invvar do not agree!')
if ia is None:
ia = np.ones((ncoeff,), dtype=bool)
if not ia.all():
if inputans is None:
inputans = np.zeros((ncoeff,), dtype=x.dtype)
#
# Select unmasked points
#
igood = (invvar > 0).nonzero()[0]
ngood = len(igood)
res = np.zeros((ncoeff,), dtype=x.dtype)
yfit = np.zeros(x.shape, dtype=x.dtype)
if ngood == 0:
pass
elif ngood == 1:
res[0] = y[igood[0]]
yfit += y[igood[0]]
else:
ncfit = min(ngood, ncoeff)
function_map = {
'legendre': flegendre,
'flegendre': flegendre,
'chebyshev': fchebyshev,
'fchebyshev': fchebyshev,
'chebyshev_split': fchebyshev_split,
'fchebyshev_split': fchebyshev_split,
'poly': fpoly,
'fpoly': fpoly
}
try:
legarr = function_map[function_name](x, ncfit)
except KeyError:
raise KeyError('Unknown function type: {0}'.format(function_name))
if inputfunc is not None:
if inputfunc.shape != x.shape:
raise ValueError('Dimensions of X and inputfunc do not agree!')
legarr *= np.tile(inputfunc, ncfit).reshape(ncfit, x.shape[0])
yfix = np.zeros(x.shape, dtype=x.dtype)
nonfix = ia[0:ncfit].nonzero()[0]
nparams = len(nonfix)
fixed = (~ia[0:ncfit]).nonzero()[0]
if len(fixed) > 0:
yfix = np.dot(legarr.T, inputans * (1 - ia))
ysub = y - yfix
finalarr = legarr[nonfix, :]
else:
finalarr = legarr
ysub = y
# extra2 = finalarr * np.outer(np.ones((nparams,), dtype=x.dtype),
# (invvar > 0))
extra2 = finalarr * np.outer(np.ones((nparams,), dtype=x.dtype),
invvar)
alpha = np.dot(finalarr, extra2.T)
# assert alpha.dtype == x.dtype
if nparams > 1:
# beta = np.dot(ysub * (invvar > 0), finalarr.T)
beta = np.dot(ysub * invvar, finalarr.T)
assert beta.dtype == x.dtype
# uu,ww,vv = np.linalg.svd(alpha, full_matrices=False)
res[nonfix] = np.linalg.solve(alpha, beta)
else:
# res[nonfix] = (ysub * (invvar > 0) * finalarr).sum()/alpha
res[nonfix] = (ysub * invvar * finalarr).sum()/alpha
if len(fixed) > 0:
res[fixed] = inputans[fixed]
yfit = np.dot(legarr.T, res[0:ncfit])
return (res, yfit)
[docs]class TraceSet(object):
"""Implements the idea of a trace set.
Attributes
----------
func : :class:`str`
Name of function type used to fit the trace set.
xmin : float-like
Minimum x value.
xmax : float-like
Maximum x value.
coeff : array-like
Coefficients of the trace set fit.
nTrace : :class:`int`
Number of traces in the object.
ncoeff : :class:`int`
Number of coefficients of the trace set fit.
xjumplo : float-like
Jump value, for BOSS readouts.
xjumphi : float-like
Jump value, for BOSS readouts.
xjumpval : float-like
Jump value, for BOSS readouts.
outmask : array-like
When initialized with x,y positions, this contains the rejected
points.
yfit : array-like
When initialized with x,y positions, this contains the fitted y
values.
"""
_func_map = {'poly': fpoly, 'legendre': flegendre,
'chebyshev': fchebyshev}
def __init__(self, *args, **kwargs):
"""This class can be initialized either with a set of xy positions,
or with a trace set HDU from a FITS file.
"""
if len(args) == 1 and isinstance(args[0], FITS_rec):
#
# Initialize with FITS data
#
self.func = args[0]['FUNC'][0]
self.xmin = args[0]['XMIN'][0]
self.xmax = args[0]['XMAX'][0]
self.coeff = args[0]['COEFF'][0]
self.nTrace = self.coeff.shape[0]
self.ncoeff = self.coeff.shape[1]
if 'XJUMPLO' in args[0].dtype.names:
self.xjumplo = args[0]['XJUMPLO'][0]
self.xjumphi = args[0]['XJUMPHI'][0]
self.xjumpval = args[0]['XJUMPVAL'][0]
else:
self.xjumplo = None
self.xjumphi = None
self.xjumpval = None
self.outmask = None
self.yfit = None
elif len(args) == 2:
#
# Initialize with x, y positions.
#
xpos = args[0]
ypos = args[1]
self.nTrace = xpos.shape[0]
if 'invvar' in kwargs:
invvar = kwargs['invvar']
else:
invvar = np.ones(xpos.shape, dtype=xpos.dtype)
if 'func' in kwargs:
self.func = kwargs['func']
else:
self.func = 'legendre'
if 'ncoeff' in kwargs:
self.ncoeff = int(kwargs['ncoeff'])
else:
self.ncoeff = 3
if 'xmin' in kwargs:
self.xmin = np.float64(kwargs['xmin'])
else:
self.xmin = xpos.min()
if 'xmax' in kwargs:
self.xmax = np.float64(kwargs['xmax'])
else:
self.xmax = xpos.max()
if 'maxiter' in kwargs:
maxiter = int(kwargs['maxiter'])
else:
maxiter = 10
if 'inmask' in kwargs:
inmask = kwargs['inmask']
else:
inmask = np.ones(xpos.shape, dtype=bool)
do_jump = False
if 'xjumplo' in kwargs:
do_jump = True
self.xjumplo = np.float64(kwargs['xjumplo'])
else:
self.xjumplo = None
if 'xjumphi' in kwargs:
self.xjumphi = np.float64(kwargs['xjumphi'])
else:
self.xjumphi = None
if 'xjumpval' in kwargs:
self.xjumpval = np.float64(kwargs['xjumpval'])
else:
self.xjumpval = None
self.coeff = np.zeros((self.nTrace, self.ncoeff), dtype=xpos.dtype)
self.outmask = np.zeros(xpos.shape, dtype=bool)
self.yfit = np.zeros(xpos.shape, dtype=xpos.dtype)
for iTrace in range(self.nTrace):
xvec = self.xnorm(xpos[iTrace, :], do_jump)
iIter = 0
qdone = False
tempivar = (invvar[iTrace, :] *
inmask[iTrace, :].astype(invvar.dtype))
thismask = tempivar > 0
while (not qdone) and (iIter <= maxiter):
res, ycurfit = func_fit(xvec, ypos[iTrace, :], self.ncoeff,
invvar=tempivar, function_name=self.func)
thismask, qdone = djs_reject(ypos[iTrace, :], ycurfit,
invvar=tempivar)
iIter += 1
self.yfit[iTrace, :] = ycurfit
self.coeff[iTrace, :] = res
self.outmask[iTrace, :] = thismask
else:
raise PydlutilsException("Wrong number of arguments to TraceSet!")
[docs] def xy(self, xpos=None, ignore_jump=False):
"""Convert from a trace set to an array of x,y positions.
Parameters
----------
xpos : array-like, optional
If provided, evaluate the trace set at these positions. Otherwise
the positions will be constructed from the trace set object iself.
ignore_jump : :class:`bool`, optional
If ``True``, ignore any jump information in the `tset` object
Returns
-------
:class:`tuple` of array-like
The x, y positions.
"""
do_jump = self.has_jump and (not ignore_jump)
if xpos is None:
xpos = djs_laxisgen([self.nTrace, self.nx], iaxis=1) + self.xmin
ypos = np.zeros(xpos.shape, dtype=xpos.dtype)
for iTrace in range(self.nTrace):
xvec = self.xnorm(xpos[iTrace, :], do_jump)
legarr = self._func_map[self.func](xvec, self.ncoeff)
ypos[iTrace, :] = np.dot(legarr.T, self.coeff[iTrace, :])
return (xpos, ypos)
@property
def has_jump(self):
"""``True`` if jump conditions are set.
"""
return self.xjumplo is not None
@property
def xRange(self):
"""Range of x values.
"""
return self.xmax - self.xmin
@property
def nx(self):
"""Number of x values.
"""
return int(self.xRange + 1)
@property
def xmid(self):
"""Midpoint of x values.
"""
return 0.5 * (self.xmin + self.xmax)
[docs] def xnorm(self, xinput, jump):
"""Convert input x coordinates to normalized coordinates suitable
for input to special polynomials.
Parameters
----------
xinput : array-like
Input coordinates.
jump : :class:`bool`
Set to ``True`` if there is a jump.
Returns
-------
array-like
Normalized coordinates.
"""
if jump:
# Vector specifying what fraction of the jump has passed:
jfrac = np.minimum(np.maximum(((xinput - self.xjumplo) /
(self.xjumphi - self.xjumplo)), 0.), 1.)
# Conversion to "natural" x baseline:
xnatural = xinput + jfrac * self.xjumpval
else:
xnatural = xinput
return 2.0 * (xnatural - self.xmid)/self.xRange
[docs]def traceset2xy(tset, xpos=None, ignore_jump=False):
"""Convert from a trace set to an array of x,y positions.
Parameters
----------
tset : :class:`TraceSet`
A :class:`TraceSet` object.
xpos : array-like, optional
If provided, evaluate the trace set at these positions. Otherwise
the positions will be constructed from the trace set object iself.
ignore_jump : bool, optional
If ``True``, ignore any jump information in the `tset` object
Returns
-------
:class:`tuple` of array-like
The x, y positions.
"""
return tset.xy(xpos, ignore_jump)
[docs]def xy2traceset(xpos, ypos, **kwargs):
"""Convert from x,y positions to a trace set.
Parameters
----------
xpos, ypos : array-like
X,Y positions corresponding as [nx,Ntrace] arrays.
invvar : array-like, optional
Inverse variances for fitting.
func : :class:`str`, optional
Function type for fitting; defaults to 'legendre'.
ncoeff : :class:`int`, optional
Number of coefficients to fit. Defaults to 3.
xmin, xmax : :class:`float`, optional
Explicitly set minimum and maximum values, instead of computing
them from `xpos`.
maxiter : :class:`int`, optional
Maximum number of rejection iterations; set to 0 for no rejection;
default to 10.
inmask : array-like, optional
Mask set to 1 for good points and 0 for rejected points;
same dimensions as `xpos`, `ypos`. Points rejected by `inmask`
are always rejected from the fits (the rejection is "sticky"),
and will also be marked as rejected in the outmask attribute.
ia, inputans, inputfunc : array-like, optional
These arguments will be passed to :func:`func_fit`.
xjumplo : :class:`float`, optional
x position locating start of an x discontinuity
xjumphi : :class:`float`, optional
x position locating end of that x discontinuity
xjumpval : :class:`float`, optional
magnitude of the discontinuity "jump" between those bounds
(previous 3 keywords motivated by BOSS 2-phase readout)
Returns
-------
:class:`TraceSet`
A :class:`TraceSet` object.
"""
return TraceSet(xpos, ypos, **kwargs)