Source code for pydl.pydlutils.bspline

# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""This module corresponds to the bspline directory in idlutils.
"""
from warnings import warn
import numpy as np
from numpy.linalg.linalg import LinAlgError
from scipy.linalg import cholesky_banded, cho_solve_banded
from . import PydlutilsUserWarning
from .math import djs_reject
from .trace import fchebyshev
from .. import uniq
from ..goddard.math import flegendre


[docs]class bspline(object): """B-spline class. Functions in the idlutils bspline library are implemented as methods on this class. Parameters ---------- x : :class:`numpy.ndarray` The data. nord : :class:`int`, optional The order of the B-spline. Default is 4, which is cubic. npoly : :class:`int`, optional Polynomial order to fit over 2nd variable, if supplied. If not supplied the order is 1. bkpt : :class:`numpy.ndarray`, optional To be documented. bkspread : :class:`float`, optional To be documented. Attributes ---------- breakpoints To be documented. nord To be documented. npoly To be documented. mask To be documented. coeff To be documented. icoeff To be documented. xmin To be documented. xmax To be documented. funcname To be documented. """ def __init__(self, x, nord=4, npoly=1, bkpt=None, bkspread=1.0, **kwargs): """Init creates an object whose attributes are similar to the structure returned by the ``create_bsplineset()`` function. """ # # Set the breakpoints. # if bkpt is None: startx = x.min() rangex = x.max() - startx if 'placed' in kwargs: w = ((kwargs['placed'] >= startx) & (kwargs['placed'] <= startx+rangex)) if w.sum() < 2: bkpt = np.arange(2, dtype='f') * rangex + startx else: bkpt = kwargs['placed'][w] elif 'bkspace' in kwargs: nbkpts = int(rangex/kwargs['bkspace']) + 1 if nbkpts < 2: nbkpts = 2 tempbkspace = rangex/float(nbkpts-1) bkpt = np.arange(nbkpts, dtype='f')*tempbkspace + startx elif 'nbkpts' in kwargs: nbkpts = kwargs['nbkpts'] if nbkpts < 2: nbkpts = 2 tempbkspace = rangex/float(nbkpts-1) bkpt = np.arange(nbkpts, dtype='f') * tempbkspace + startx elif 'everyn' in kwargs: npkpts = max(nx/kwargs['everyn'], 1) if nbkpts == 1: xspot = [0] else: xspot = int(nx/(nbkpts-1)) * np.arange(nbkpts, dtype='i4') bkpt = x[xspot].astype('f') else: raise ValueError('No information for bkpts.') imin = bkpt.argmin() imax = bkpt.argmax() if x.min() < bkpt[imin]: warn('Lowest breakpoint does not cover lowest x value: changing.', PydlutilsUserWarning) bkpt[imin] = x.min() if x.max() > bkpt[imax]: warn('Highest breakpoint does not cover highest x value: changing.', PydlutilsUserWarning) bkpt[imax] = x.max() nshortbkpt = bkpt.size fullbkpt = bkpt.copy() if nshortbkpt == 1: bkspace = np.float32(bkspread) else: bkspace = (bkpt[1] - bkpt[0]) * np.float32(bkspread) for i in np.arange(1, nord, dtype=np.float32): fullbkpt = np.insert(fullbkpt, 0, bkpt[0]-bkspace*i) fullbkpt = np.insert(fullbkpt, fullbkpt.shape[0], bkpt[nshortbkpt-1] + bkspace*i) # # Set the attributes # nc = fullbkpt.size - nord self.breakpoints = fullbkpt self.nord = nord self.npoly = npoly self.mask = np.ones((fullbkpt.size,), dtype='bool') if npoly > 1: self.coeff = np.zeros((npoly, nc), dtype='d') self.icoeff = np.zeros((npoly, nc), dtype='d') else: self.coeff = np.zeros((nc,), dtype='d') self.icoeff = np.zeros((nc,), dtype='d') self.xmin = 0.0 self.xmax = 1.0 self.funcname = 'legendre' return
[docs] def fit(self, xdata, ydata, invvar, x2=None): """Calculate a B-spline in the least-squares sense. Fit is based on two variables: `xdata` which is sorted and spans a large range where breakpoints are required `ydata` which can be described with a low order polynomial. Parameters ---------- xdata : :class:`numpy.ndarray` Independent variable. ydata : :class:`numpy.ndarray` Dependent variable. invvar : :class:`numpy.ndarray` Inverse variance of `ydata`. x2 : :class:`numpy.ndarray`, optional Orthogonal dependent variable for 2d fits. Returns ------- :class:`tuple` A tuple containing an integer error code, and the evaluation of the b-spline at the input values. An error code of -2 is a failure, -1 indicates dropped breakpoints, 0 is success, and positive integers indicate ill-conditioned breakpoints. """ goodbk = self.mask[self.nord:] nn = goodbk.sum() if nn < self.nord: yfit = np.zeros(ydata.shape, dtype='f') return (-2, yfit) nfull = nn * self.npoly bw = self.npoly * self.nord a1, lower, upper = self.action(xdata, x2=x2) foo = np.tile(invvar, bw).reshape(bw, invvar.size).transpose() a2 = a1 * foo alpha = np.zeros((bw, nfull+bw), dtype='d') beta = np.zeros((nfull+bw,), dtype='d') bi = np.arange(bw, dtype='i4') bo = np.arange(bw, dtype='i4') for k in range(1, bw): bi = np.append(bi, np.arange(bw-k, dtype='i4')+(bw+1)*k) bo = np.append(bo, np.arange(bw-k, dtype='i4')+bw*k) for k in range(nn-self.nord+1): itop = k*self.npoly ibottom = min(itop, nfull) + bw - 1 ict = upper[k] - lower[k] + 1 if ict > 0: work = np.dot(a1[lower[k]:upper[k]+1, :].T, a2[lower[k]:upper[k]+1, :]) wb = np.dot(ydata[lower[k]:upper[k]+1], a2[lower[k]:upper[k]+1, :]) alpha.T.flat[bo+itop*bw] += work.flat[bi] beta[itop:ibottom+1] += wb min_influence = 1.0e-10 * invvar.sum() / nfull errb = cholesky_band(alpha, mininf=min_influence) if isinstance(errb[0], int) and errb[0] == -1: a = errb[1] else: yfit, foo = self.value(xdata, x2=x2, action=a1, upper=upper, lower=lower) return (self.maskpoints(errb[0]), yfit) sol = cholesky_solve(a, beta) if self.npoly > 1: self.icoeff[:, goodbk] = np.array(a[0, 0:nfull].reshape(self.npoly, nn), dtype=a.dtype) self.coeff[:, goodbk] = np.array(sol[0:nfull].reshape(self.npoly, nn), dtype=sol.dtype) else: self.icoeff[goodbk] = np.array(a[0, 0:nfull], dtype=a.dtype) self.coeff[goodbk] = np.array(sol[0:nfull], dtype=sol.dtype) yfit, foo = self.value(xdata, x2=x2, action=a1, upper=upper, lower=lower) return (0, yfit)
[docs] def action(self, x, x2=None): """Construct banded B-spline matrix, with dimensions [ndata, bandwidth]. Parameters ---------- x : :class:`numpy.ndarray` Independent variable. x2 : :class:`numpy.ndarray`, optional Orthogonal dependent variable for 2d fits. Returns ------- :class:`tuple` A tuple containing the B-spline action matrix; the 'lower' parameter, a list of pixel positions, each corresponding to the first occurence of position greater than breakpoint indx; and 'upper', Same as lower, but denotes the upper pixel positions. """ nx = x.size nbkpt = self.mask.sum() if nbkpt < 2*self.nord: return (-2, 0, 0) n = nbkpt - self.nord gb = self.breakpoints[self.mask] bw = self.npoly*self.nord lower = np.zeros((n - self.nord + 1,), dtype='i4') upper = np.zeros((n - self.nord + 1,), dtype='i4') - 1 indx = self.intrv(x) bf1 = self.bsplvn(x, indx) action = bf1 aa = uniq(indx, np.arange(indx.size, dtype='i4')) upper[indx[aa]-self.nord+1] = aa rindx = indx[::-1] bb = uniq(rindx, np.arange(rindx.size, dtype='i4')) lower[rindx[bb]-self.nord+1] = nx - bb - 1 if x2 is not None: if x2.size != nx: raise ValueError('Dimensions of x and x2 do not match.') x2norm = 2.0 * (x2 - self.xmin) / (self.xmax - self.xmin) - 1.0 if self.funcname == 'poly': temppoly = np.ones((nx, self.npoly), dtype='f') for i in range(1, self.npoly): temppoly[:, i] = temppoly[:, i-1] * x2norm elif self.funcname == 'poly1': temppoly = np.tile(x2norm, self.npoly).reshape(nx, self.npoly) for i in range(1, self.npoly): temppoly[:, i] = temppoly[:, i-1] * x2norm elif self.funcname == 'chebyshev': temppoly = fchebyshev(x2norm, self.npoly) elif self.funcname == 'legendre': temppoly = flegendre(x2norm, self.npoly) else: raise ValueError('Unknown value of funcname.') action = np.zeros((nx, bw), dtype='d') counter = -1 for ii in range(self.nord): for jj in range(self.npoly): counter += 1 action[:, counter] = bf1[:, ii]*temppoly[:, jj] return (action, lower, upper)
[docs] def intrv(self, x): """Find the segment between breakpoints which contain each value in the array `x`. The minimum breakpoint is ``nbkptord - 1``, and the maximum is ``nbkpt - nbkptord - 1``. Parameters ---------- x : :class:`numpy.ndarray` Data values, assumed to be monotonically increasing. Returns ------- :class:`numpy.ndarray` Position of array elements with respect to breakpoints. """ gb = self.breakpoints[self.mask] n = gb.size - self.nord indx = np.zeros((x.size,), dtype='i4') ileft = self.nord - 1 for i in range(x.size): while x[i] > gb[ileft+1] and ileft < n - 1: ileft += 1 indx[i] = ileft return indx
[docs] def bsplvn(self, x, ileft): """Calculates the value of all possibly nonzero B-splines at `x` of a certain order. Parameters ---------- x : :class:`numpy.ndarray` Independent variable. ileft : :class:`int` Breakpoint segements that contain `x`. Returns ------- :class:`numpy.ndarray` B-spline values. """ bkpt = self.breakpoints[self.mask] vnikx = np.zeros((x.size, self.nord), dtype=x.dtype) deltap = vnikx.copy() deltam = vnikx.copy() j = 0 vnikx[:, 0] = 1.0 while j < self.nord - 1: ipj = ileft+j+1 deltap[:, j] = bkpt[ipj] - x imj = ileft-j deltam[:, j] = x - bkpt[imj] vmprev = 0.0 for l in range(j+1): vm = vnikx[:, l]/(deltap[:, l] + deltam[:, j-l]) vnikx[:, l] = vm*deltap[:, l] + vmprev vmprev = vm*deltam[:, j-l] j += 1 vnikx[:, j] = vmprev return vnikx
[docs] def value(self, x, x2=None, action=None, lower=None, upper=None): """Evaluate a B-spline at specified values. Parameters ---------- x : :class:`numpy.ndarray` Independent variable. x2 : :class:`numpy.ndarray`, optional Orthogonal dependent variable for 2d fits. action : :class:`numpy.ndarray`, optional Action matrix to use. If not supplied it is calculated. lower : :class:`numpy.ndarray`, optional If the action parameter is supplied, this parameter must also be supplied. upper : :class:`numpy.ndarray`, optional If the action parameter is supplied, this parameter must also be supplied. Returns ------- :class:`tuple` A tuple containing the results of the bspline evaluation and a mask indicating where the evaluation was good. """ xsort = x.argsort() xwork = x[xsort] if x2 is not None: x2work = x2[xsort] else: x2work = None if action is not None: if lower is None or upper is None: raise ValueError('Must specify lower and upper if action is set.') else: action, lower, upper = self.action(xwork, x2=x2work) yfit = np.zeros(x.shape, dtype=x.dtype) bw = self.npoly * self.nord spot = np.arange(bw, dtype='i4') goodbk = self.mask.nonzero()[0] coeffbk = self.mask[self.nord:].nonzero()[0] n = self.mask.sum() - self.nord if self.npoly > 1: goodcoeff = self.coeff[:, coeffbk] else: goodcoeff = self.coeff[coeffbk] # maskthis = np.zeros(xwork.shape,dtype=xwork.dtype) for i in range(n-self.nord+1): ict = upper[i] - lower[i] + 1 if ict > 0: yfit[lower[i]:upper[i]+1] = np.dot( action[lower[i]:upper[i]+1, :], goodcoeff[i*self.npoly+spot]) yy = yfit.copy() yy[xsort] = yfit mask = np.ones(x.shape, dtype='bool') gb = self.breakpoints[goodbk] outside = ((x < gb[self.nord-1]) | (x > gb[n])) if outside.any(): mask[outside] = False hmm = ((np.diff(goodbk) > 2).nonzero())[0] for jj in range(hmm.size): inside = ((x >= self.breakpoints[goodbk[hmm[jj]]]) & (x <= self.breakpoints[goodbk[hmm[jj]+1]-1])) if inside.any(): mask[inside] = False return (yy, mask)
[docs] def maskpoints(self, err): """Perform simple logic of which breakpoints to mask. Parameters ---------- err : :class:`numpy.ndarray` The list of indexes returned by the cholesky routines. Returns ------- :class:`int` An integer indicating the results of the masking. -1 indicates that the error points were successfully masked. -2 indicates failure; the calculation should be aborted. Notes ----- The mask attribute is modified, assuming it is possible to create the mask. """ nbkpt = self.mask.sum() if nbkpt <= 2*self.nord: return -2 hmm = err[np.unique(err/self.npoly)]/self.npoly n = nbkpt - self.nord if np.any(hmm >= n): return -2 test = np.zeros(nbkpt, dtype='bool') for jj in range(-np.ceil(nord/2.0), nord/2.0): foo = np.where((hmm+jj) > 0, hmm+jj, np.zeros(hmm.shape, dtype=hmm.dtype)) inside = np.where((foo+nord) < n-1, foo+nord, np.zeros(hmm.shape, dtype=hmm.dtype)+n-1) test[inside] = True if test.any(): reality = self.mask[test] if self.mask[reality].any(): self.mask[reality] = False return -1 else: return -2 else: return -2
[docs]def cholesky_band(l, mininf=0.0): """Compute *lower* Cholesky decomposition of a banded matrix. This function provides informative error messages to pass back to the :class:`~pydl.pydlutils.bspline.bspline` machinery; the actual computation is delegated to :func:`scipy.linalg.cholesky_banded`. Parameters ---------- l : :class:`numpy.ndarray` A matrix on which to perform the Cholesky decomposition. The matrix must be in a special, *lower* form described in :func:`scipy.linalg.cholesky_banded`. In addition, the input must be padded. If the original, square matrix has size :math:`N \\times N`, and the width of the band is :math:`b`, `l` must be :math:`b \\times (N + b)`. mininf : :class:`float`, optional Entries in the `l` matrix are considered negative if they are less than this value (default 0.0). Returns ------- :class:`tuple` If problems were detected, the first item will be the index or indexes where the problem was detected, and the second item will simply be the input matrix. If no problems were detected, the first item will be -1, and the second item will be the Cholesky decomposition. """ bw, nn = l.shape n = nn - bw negative = l[0, 0:n] <= mininf if negative.any() or not np.all(np.isfinite(l)): warn('Bad entries: ' + str(negative.nonzero()[0]), PydlutilsUserWarning) return (negative.nonzero()[0], l) try: lower = cholesky_banded(l[:, 0:n], lower=True) except LinAlgError: # # Figure out where the error is. # lower = l.copy() kn = bw - 1 spot = np.arange(kn, dtype='i4') + 1 for j in range(n): lower[0, j] = np.sqrt(lower[0, j]) lower[spot, j] /= lower[0, j] x = lower[spot, j] if not np.all(np.isfinite(x)): warn('NaN found in cholesky_band.', PydlutilsUserWarning) return (j, l) # # Restore padding. # L = np.zeros(l.shape, dtype=l.dtype) L[:, 0:n] = lower return (-1, L)
[docs]def cholesky_solve(a, bb): """Solve the equation :math:`A x = b` where `a` is a *lower* Cholesky-banded matrix. In the :class:`~pydl.pydlutils.bspline.bspline` machinery, `a` needs to be padded. This function should only used with the output of :func:`~pydl.pydlutils.bspline.cholesky_band`, to ensure the proper padding on `a`. Otherwise the computation is delegated to :func:`scipy.linalg.cho_solve_banded`. Parameters ---------- a : :class:`numpy.ndarray` *Lower* Cholesky decomposition of :math:`A` in :math:`A x = b`. bb : :class:`numpy.ndarray` :math:`b` in :math:`A x = b`. Returns ------- :class:`numpy.ndarray` The solution, padded to be the same shape as `bb`. """ bw = a.shape[0] n = bb.shape[0] - bw x = np.zeros(bb.shape, dtype=bb.dtype) x[0:n] = cho_solve_banded((a[:, 0:n], True), bb[0:n]) return x
[docs]def iterfit(xdata, ydata, invvar=None, upper=5, lower=5, x2=None, maxiter=10, **kwargs): """Iteratively fit a B-spline set to data, with rejection. Parameters ---------- xdata : :class:`numpy.ndarray` Independent variable. ydata : :class:`numpy.ndarray` Dependent variable. invvar : :class:`numpy.ndarray`, optional Inverse variance of `ydata`. If not set, it will be calculated based on the standard deviation. upper : :class:`int` or :class:`float`, optional Upper rejection threshold in units of sigma, defaults to 5 sigma. lower : :class:`int` or :class:`float`, optional Lower rejection threshold in units of sigma, defaults to 5 sigma. x2 : :class:`numpy.ndarray`, optional Orthogonal dependent variable for 2d fits. maxiter : :class:`int`, optional Maximum number of rejection iterations, default 10. Set this to zero to disable rejection. Returns ------- :class:`tuple` A tuple containing the fitted bspline object and an output mask. """ nx = xdata.size if ydata.size != nx: raise ValueError('Dimensions of xdata and ydata do not agree.') if invvar is not None: if invvar.size != nx: raise ValueError('Dimensions of xdata and invvar do not agree.') else: # # This correction to the variance makes it the same # as IDL's variance() # var = ydata.var()*(float(nx)/float(nx-1)) if var == 0: var = 1.0 invvar = np.ones(ydata.shape, dtype=ydata.dtype)/var if x2 is not None: if x2.size != nx: raise ValueError('Dimensions of xdata and x2 do not agree.') yfit = np.zeros(ydata.shape, dtype=ydata.dtype) if invvar.size == 1: outmask = True else: outmask = np.ones(invvar.shape, dtype='bool') xsort = xdata.argsort() maskwork = (outmask & (invvar > 0))[xsort] if 'oldset' in kwargs: sset = kwargs['oldset'] sset.mask = True sset.coeff = 0 else: if not maskwork.any(): raise ValueError('No valid data points.') if 'fullbkpt' in kwargs: fullbkpt = kwargs['fullbkpt'] else: sset = bspline(xdata[xsort[maskwork]], **kwargs) if maskwork.sum() < sset.nord: warn('Number of good data points fewer than nord.', PydlutilsUserWarning) return (sset, outmask) if x2 is not None: if 'xmin' in kwargs: xmin = kwargs['xmin'] else: xmin = x2.min() if 'xmax' in kwargs: xmax = kwargs['xmax'] else: xmax = x2.max() if xmin == xmax: xmax = xmin + 1 sset.xmin = xmin sset.xmax = xmax if 'funcname' in kwargs: sset.funcname = kwargs['funcname'] xwork = xdata[xsort] ywork = ydata[xsort] invwork = invvar[xsort] if x2 is not None: x2work = x2[xsort] else: x2work = None iiter = 0 error = 0 qdone = -1 while (error != 0 or qdone == -1) and iiter <= maxiter: goodbk = sset.mask.nonzero()[0] if maskwork.sum() <= 1 or not sset.mask.any(): sset.coeff = 0 iiter = maxiter + 1 else: if 'requiren' in kwargs: i = 0 while xwork[i] < sset.breakpoints[goodbk[sset.nord]] and i < nx-1: i += 1 ct = 0 for ileft in range(sset.nord, sset.mask.sum()-sset.nord+1): while (xwork[i] >= sset.breakpoints[goodbk[ileft]] and xwork[i] < sset.breakpoints[goodbk[ileft+1]] and i < nx-1): ct += invwork[i]*maskwork[i] > 0 i += 1 if ct >= kwargs['requiren']: ct = 0 else: sset.mask[goodbk[ileft]] = False error, yfit = sset.fit(xwork, ywork, invwork*maskwork, x2=x2work) iiter += 1 inmask = maskwork if error == -2: return (sset, outmask) elif error == 0: maskwork, qdone = djs_reject(ywork, yfit, invvar=invwork, inmask=inmask, outmask=maskwork, upper=upper, lower=lower) else: pass outmask[xsort] = maskwork temp = yfit yfit[xsort] = temp return (sset, outmask)