# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""This module corresponds to the math directory in idlutils.
"""
import numpy as np
from numpy.linalg import svd
import astropy.utils as au
from .misc import djs_laxisnum
from ..median import median
[docs]class computechi2(object):
"""Solve the linear set of equations :math:`A x = b` using SVD.
The attributes of this class are all read-only properties, implemented
with :class:`~astropy.utils.decorators.lazyproperty`.
Parameters
----------
bvec : :class:`numpy.ndarray`
The :math:`b` vector in :math:`A x = b`. This vector has length
:math:`N`.
sqivar : :class:`numpy.ndarray`
The reciprocal of the errors in `bvec`. The name comes from the square
root of the inverse variance, which is what this is.
amatrix : :class:`numpy.ndarray`
The matrix :math:`A` in :math:`A x = b`.
The shape of this matrix is (:math:`N`, :math:`M`).
"""
def __init__(self, bvec, sqivar, amatrix):
"""Initialize the object and perform initial computations.
"""
#
# Save the inputs
#
# self.bvec = bvec
self.sqivar = sqivar
self.amatrix = amatrix
if len(amatrix.shape) > 1:
self.nstar = amatrix.shape[1]
else:
self.nstar = 1
self.bvec = bvec * sqivar
self.mmatrix = self.amatrix * np.tile(sqivar, self.nstar).reshape(
self.nstar, bvec.size).transpose()
mm = np.dot(self.mmatrix.T, self.mmatrix)
self.uu, self.ww, self.vv = svd(mm, full_matrices=False)
self.mmi = np.dot((self.vv.T / np.tile(self.ww, self.nstar).reshape(
self.nstar, self.nstar)), self.uu.T)
return
@au.lazyproperty
def acoeff(self):
"""(:class:`~numpy.ndarray`) The fit parameters, :math:`x`,
in :math:`A x = b`. This vector has length :math:`M`.
"""
return np.dot(self.mmi, np.dot(self.mmatrix.T, self.bvec))
@au.lazyproperty
def chi2(self):
"""(:class:`float <numpy.generic>`) The :math:`\chi^2` value of the fit.
"""
return np.sum((np.dot(self.mmatrix, self.acoeff) - self.bvec)**2)
@au.lazyproperty
def yfit(self):
"""(:class:`~numpy.ndarray`) The evaluated best-fit at each point.
This vector has length :math:`N`.
"""
return np.dot(self.amatrix, self.acoeff)
@au.lazyproperty
def dof(self):
"""(:class:`int <numpy.generic>`) The degrees of freedom of the fit.
This is the number of values of `bvec` that have `sqivar` > 0 minus
the number of fit paramaters, which is equal to :math:`M`.
"""
return (self.sqivar > 0).sum() - self.nstar
@au.lazyproperty
def covar(self):
"""(:class:`~numpy.ndarray`) The covariance matrix.
The shape of this matrix is (:math:`M`, :math:`M`).
"""
wwt = self.ww.copy()
wwt[self.ww > 0] = 1.0/self.ww[self.ww > 0]
covar = np.zeros((self.nstar, self.nstar), dtype=self.ww.dtype)
for i in range(self.nstar):
for j in range(i + 1):
covar[i, j] = np.sum(wwt * self.vv[:, i] * self.vv[:, j])
covar[j, i] = covar[i, j]
return covar
@au.lazyproperty
def var(self):
"""(:class:`~numpy.ndarray`) The variances of the fit.
This is identical to the diagonal of the covariance matrix.
This vector has length :math:`M`.
"""
return np.diag(self.covar)
[docs]def djs_reject(data, model, outmask=None, inmask=None, sigma=None,
invvar=None, lower=None, upper=None, maxdev=None,
maxrej=None, groupdim=None, groupsize=None, groupbadpix=False,
grow=0, sticky=False):
"""Routine to reject points when doing an iterative fit to data.
Parameters
----------
data : :class:`numpy.ndarray`
The data
model : :class:`numpy.ndarray`
The model, must have the same number of dimensions as `data`.
outmask : :class:`numpy.ndarray`, optional
Output mask, generated by a previous call to `djs_reject`. If not supplied,
this mask will be initialized to a mask that masks nothing. Although
this parameter is technically optional, it will almost always be set.
inmask : :class:`numpy.ndarray`, optional
Input mask. Bad points are marked with a value that evaluates to ``False``.
Must have the same number of dimensions as `data`.
sigma : :class:`numpy.ndarray`, optional
Standard deviation of the data, used to reject points based on the values
of `upper` and `lower`.
invvar : :class:`numpy.ndarray`, optional
Inverse variance of the data, used to reject points based on the values
of `upper` and `lower`. If both `sigma` and `invvar` are set, `invvar`
will be ignored.
lower : :class:`int` or :class:`float`, optional
If set, reject points with data < model - lower * sigma.
upper : :class:`int` or :class:`float`, optional
If set, reject points with data > model + upper * sigma.
maxdev : :class:`int` or :class:`float`, optional
If set, reject points with abs(data-model) > maxdev. It is permitted to
set all three of `lower`, `upper` and `maxdev`.
maxrej : :class:`int` or :class:`numpy.ndarray`, optional
Maximum number of points to reject in this iteration. If `groupsize` or
`groupdim` are set to arrays, this should be an array as well.
groupdim
To be documented.
groupsize
To be documented.
groupbadpix : :class:`bool`, optional
If set to ``True``, consecutive sets of bad pixels are considered groups,
overriding the values of `groupsize`.
grow : :class:`int`, optional
If set to a non-zero integer, N, the N nearest neighbors of rejected
pixels will also be rejected.
sticky : :class:`bool`, optional
If set to ``True``, pixels rejected in one iteration remain rejected in
subsequent iterations, even if the model changes.
Returns
-------
:class:`tuple`
A tuple containing a mask where rejected data values are ``False`` and
a boolean value set to ``True`` if `djs_reject` believes there is no
further rejection to be done.
Raises
------
:exc:`ValueError`
If dimensions of various inputs do not match.
"""
#
# Create outmask setting = 1 for good data.
#
if outmask is None:
outmask = np.ones(data.shape, dtype='bool')
else:
if data.shape != outmask.shape:
raise ValueError('Dimensions of data and outmask do not agree.')
#
# Check other inputs.
#
if model is None:
if inmask is not None:
outmask = inmask
return (outmask, False)
else:
if data.shape != model.shape:
raise ValueError('Dimensions of data and model do not agree.')
if inmask is not None:
if data.shape != inmask.shape:
raise ValueError('Dimensions of data and inmask do not agree.')
if maxrej is not None:
if groupdim is not None:
if len(maxrej) != len(groupdim):
raise ValueError('maxrej and groupdim must have the same number of elements.')
else:
groupdim = []
if groupsize is not None:
if len(maxrej) != len(groupsize):
raise ValueError('maxrej and groupsize must have the same number of elements.')
else:
groupsize = len(data)
if sigma is None and invvar is None:
if inmask is not None:
igood = (inmask & outmask).nonzero()[0]
else:
igood = outmask.nonzero()[0]
if len(igood > 1):
sigma = np.std(data[igood] - model[igood])
else:
sigma = 0
diff = data - model
#
# The working array is badness, which is set to zero for good points
# (or points already rejected), and positive values for bad points.
# The values determine just how bad a point is, either corresponding
# to the number of sigma above or below the fit, or to the number
# of multiples of maxdev away from the fit.
#
badness = np.zeros(outmask.shape, dtype=data.dtype)
#
# Decide how bad a point is according to lower.
#
if lower is not None:
if sigma is not None:
qbad = diff < (-lower * sigma)
badness += ((-diff/(sigma + (sigma == 0))) > 0) * qbad
else:
qbad = (diff * np.sqrt(invvar)) < -lower
badness += ((-diff * np.sqrt(invvar)) > 0) * qbad
#
# Decide how bad a point is according to upper.
#
if upper is not None:
if sigma is not None:
qbad = diff > (upper * sigma)
badness += ((diff/(sigma + (sigma == 0))) > 0) * qbad
else:
qbad = (diff * np.sqrt(invvar)) > upper
badness += ((diff * np.sqrt(invvar)) > 0) * qbad
#
# Decide how bad a point is according to maxdev.
#
if maxdev is not None:
qbad = np.absolute(diff) > maxdev
badness += np.absolute(diff) / maxdev * qbad
#
# Do not consider rejecting points that are already rejected by inmask.
# Do not consider rejecting points that are already rejected by outmask,
# if sticky is set.
#
if inmask is not None:
badness *= inmask
if sticky:
badness *= outmask
#
# Reject a maximum of maxrej (additional) points in all the data, or
# in each group as specified by groupsize, and optionally along each
# dimension specified by groupdim.
#
if maxrej is not None:
#
# Loop over each dimension of groupdim or loop once if not set.
#
for iloop in range(max(len(groupdim), 1)):
#
# Assign an index number in this dimension to each data point.
#
if len(groupdim) > 0:
yndim = len(ydata.shape)
if groupdim[iloop] > yndim:
raise ValueError('groupdim is larger than the number of dimensions for ydata.')
dimnum = djs_laxisnum(ydata.shape, iaxis=groupdim[iloop]-1)
else:
dimnum = np.asarray([0])
#
# Loop over each vector specified by groupdim. For example, if
# this is a 2-D array with groupdim=1, then loop over each
# column of the data. If groupdim=2, then loop over each row.
# If groupdim is not set, then use the whole image.
#
for ivec in range(max(dimnum)):
#
# At this point it is not possible that dimnum is not set.
#
indx = (dimnum == ivec).nonzero()[0]
#
# Within this group of points, break it down into groups
# of points specified by groupsize, if set.
#
nin = len(indx)
if groupbadpix:
goodtemp = badness == 0
groups_lower = (-1*np.diff(np.insert(goodtemp, 0, 1)) == 1).nonzero()[0]
groups_upper = (np.diff(np.append(goodtemp, 1)) == 1).nonzero()[0]
ngroups = len(groups_lower)
else:
#
# The IDL version of this test makes no sense because
# groupsize will always be set.
#
if 'groupsize' in kwargs:
ngroups = nin/groupsize + 1
groups_lower = np.arange(ngroups, dtype='i4')*groupsize
foo = (np.arange(ngroups, dtype='i4')+1)*groupsize
groups_upper = np.where(foo < nin, foo, nin) - 1
else:
ngroups = 1
groups_lower = [0, ]
groups_upper = [nin - 1, ]
for igroup in range(ngroups):
i1 = groups_lower[igroup]
i2 = groups_upper[igroup]
nii = i2 - i1 + 1
#
# Need the test that i1 != -1 below to prevent a crash
# condition, but why is it that we ever get groups
# without any points? Because this is badly-written,
# that's why.
#
if nii > 0 and i1 != -1:
jj = indx[i1:i2+1]
#
# Test if too many points rejected in this group.
#
if np.sum(badness[jj] != 0) > maxrej[iloop]:
isort = badness[jj].argsort()
#
# Make the following points good again.
#
badness[jj[isort[0:nii-maxrej[iloop]]]] = 0
i1 += groupsize[iloop]
#
# Now modify outmask, rejecting points specified by inmask=0, outmask=0
# if sticky is set, or badness > 0.
#
# print(badness)
newmask = badness == 0
# print(newmask)
if grow > 0:
rejects = newmask == 0
if rejects.any():
irejects = rejects.nonzero()[0]
for k in range(1, grow):
newmask[(irejects - k) > 0] = 0
newmask[(irejects + k) < (data.shape[0]-1)] = 0
if inmask is not None:
newmask = newmask & inmask
if sticky:
newmask = newmask & outmask
#
# Set qdone if the input outmask is identical to the output outmask;
# convert np.bool to Python built-in bool.
#
qdone = bool(np.all(newmask == outmask))
outmask = newmask
return (outmask, qdone)
[docs]def find_contiguous(x):
"""Find the longest sequence of contiguous non-zero array elements.
Parameters
----------
x : :class:`numpy.ndarray`
A 1d array. A dtype of bool is preferred although any dtype where the
operation ``if x[k]:`` is well-defined should work.
Returns
-------
:class:`list`
A list of indices of the longest contiguous non-zero sequence.
Examples
--------
>>> import numpy as np
>>> from pydl.pydlutils.math import find_contiguous
>>> find_contiguous(np.array([0,1,1,1,0,1,1,0,1]))
[1, 2, 3]
"""
contig = list()
for k in range(x.size):
if x[k]:
if len(contig) == 0:
contig.append([k])
else:
if k == contig[-1][-1]+1:
contig[-1].append(k)
else:
contig.append([k])
lengths = [len(c) for c in contig]
longest = contig[lengths.index(max(lengths))]
return longest