# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Spline models and fitters."""
# pylint: disable=line-too-long, too-many-lines, too-many-arguments, invalid-name
import abc
import functools
import warnings
import numpy as np
from astropy.utils import isiterable
from astropy.utils.exceptions import AstropyUserWarning
from .core import FittableModel, ModelDefinitionError
from .parameters import Parameter
__all__ = [
"Spline1D",
"SplineInterpolateFitter",
"SplineSmoothingFitter",
"SplineExactKnotsFitter",
"SplineSplrepFitter",
]
__doctest_requires__ = {"Spline1D": ["scipy"]}
class _Spline(FittableModel):
"""Base class for spline models"""
_knot_names = ()
_coeff_names = ()
optional_inputs = {}
def __init__(
self,
knots=None,
coeffs=None,
degree=None,
bounds=None,
n_models=None,
model_set_axis=None,
name=None,
meta=None,
):
super().__init__(
n_models=n_models, model_set_axis=model_set_axis, name=name, meta=meta
)
self._user_knots = False
self._init_tck(degree)
# Hack to allow an optional model argument
self._create_optional_inputs()
if knots is not None:
self._init_spline(knots, coeffs, bounds)
elif coeffs is not None:
raise ValueError(
"If one passes a coeffs vector one needs to also pass knots!"
)
@property
def param_names(self):
"""
Coefficient names generated based on the spline's degree and
number of knots.
"""
return tuple(list(self._knot_names) + list(self._coeff_names))
@staticmethod
def _optional_arg(arg):
return f"_{arg}"
def _create_optional_inputs(self):
for arg in self.optional_inputs:
attribute = self._optional_arg(arg)
if hasattr(self, attribute):
raise ValueError(
f"Optional argument {arg} already exists in this class!"
)
else:
setattr(self, attribute, None)
def _intercept_optional_inputs(self, **kwargs):
new_kwargs = kwargs
for arg in self.optional_inputs:
if arg in kwargs:
attribute = self._optional_arg(arg)
if getattr(self, attribute) is None:
setattr(self, attribute, kwargs[arg])
del new_kwargs[arg]
else:
raise RuntimeError(
f"{arg} has already been set, something has gone wrong!"
)
return new_kwargs
def evaluate(self, *args, **kwargs):
"""Extract the optional kwargs passed to call"""
optional_inputs = kwargs
for arg in self.optional_inputs:
attribute = self._optional_arg(arg)
if arg in kwargs:
# Options passed in
optional_inputs[arg] = kwargs[arg]
elif getattr(self, attribute) is not None:
# No options passed in and Options set
optional_inputs[arg] = getattr(self, attribute)
setattr(self, attribute, None)
else:
# No options passed in and No options set
optional_inputs[arg] = self.optional_inputs[arg]
return optional_inputs
def __call__(self, *args, **kwargs):
"""
Make model callable to model evaluation
"""
# Hack to allow an optional model argument
kwargs = self._intercept_optional_inputs(**kwargs)
return super().__call__(*args, **kwargs)
def _create_parameter(self, name: str, index: int, attr: str, fixed=False):
"""
Create a spline parameter linked to an attribute array.
Parameters
----------
name : str
Name for the parameter
index : int
The index of the parameter in the array
attr : str
The name for the attribute array
fixed : optional, bool
If the parameter should be fixed or not
"""
# Hack to allow parameters and attribute array to freely exchange values
# _getter forces reading value from attribute array
# _setter forces setting value to attribute array
def _getter(value, model: "_Spline", index: int, attr: str):
return getattr(model, attr)[index]
def _setter(value, model: "_Spline", index: int, attr: str):
getattr(model, attr)[index] = value
return value
getter = functools.partial(_getter, index=index, attr=attr)
setter = functools.partial(_setter, index=index, attr=attr)
default = getattr(self, attr)
param = Parameter(
name=name, default=default[index], fixed=fixed, getter=getter, setter=setter
)
# setter/getter wrapper for parameters in this case require the
# parameter to have a reference back to its parent model
param.model = self
param.value = default[index]
# Add parameter to model
self.__dict__[name] = param
def _create_parameters(self, base_name: str, attr: str, fixed=False):
"""
Create a spline parameters linked to an attribute array for all
elements in that array
Parameters
----------
base_name : str
Base name for the parameters
attr : str
The name for the attribute array
fixed : optional, bool
If the parameters should be fixed or not
"""
names = []
for index in range(len(getattr(self, attr))):
name = f"{base_name}{index}"
names.append(name)
self._create_parameter(name, index, attr, fixed)
return tuple(names)
@abc.abstractmethod
def _init_parameters(self):
raise NotImplementedError("This needs to be implemented")
@abc.abstractmethod
def _init_data(self, knots, coeffs, bounds=None):
raise NotImplementedError("This needs to be implemented")
def _init_spline(self, knots, coeffs, bounds=None):
self._init_data(knots, coeffs, bounds)
self._init_parameters()
# fill _parameters and related attributes
self._initialize_parameters((), {})
self._initialize_slices()
# Calling this will properly fill the _parameter vector, which is
# used directly sometimes without being properly filled.
_ = self.parameters
def _init_tck(self, degree):
self._c = None
self._t = None
self._degree = degree
[docs]class Spline1D(_Spline):
"""
One dimensional Spline Model
Parameters
----------
knots : optional
Define the knots for the spline. Can be 1) the number of interior
knots for the spline, 2) the array of all knots for the spline, or
3) If both bounds are defined, the interior knots for the spline
coeffs : optional
The array of knot coefficients for the spline
degree : optional
The degree of the spline. It must be 1 <= degree <= 5, default is 3.
bounds : optional
The upper and lower bounds of the spline.
Notes
-----
Much of the functionality of this model is provided by
`scipy.interpolate.BSpline` which can be directly accessed via the
bspline property.
Fitting for this model is provided by wrappers for:
`scipy.interpolate.UnivariateSpline`,
`scipy.interpolate.InterpolatedUnivariateSpline`,
and `scipy.interpolate.LSQUnivariateSpline`.
If one fails to define any knots/coefficients, no parameters will
be added to this model until a fitter is called. This is because
some of the fitters for splines vary the number of parameters and so
we cannot define the parameter set until after fitting in these cases.
Since parameters are not necessarily known at model initialization,
setting model parameters directly via the model interface has been
disabled.
Direct constructors are provided for this model which incorporate the
fitting to data directly into model construction.
Knot parameters are declared as "fixed" parameters by default to
enable the use of other `astropy.modeling` fitters to be used to
fit this model.
Examples
--------
>>> import numpy as np
>>> from astropy.modeling.models import Spline1D
>>> from astropy.modeling import fitting
>>> np.random.seed(42)
>>> x = np.linspace(-3, 3, 50)
>>> y = np.exp(-x**2) + 0.1 * np.random.randn(50)
>>> xs = np.linspace(-3, 3, 1000)
A 1D interpolating spline can be fit to data:
>>> fitter = fitting.SplineInterpolateFitter()
>>> spl = fitter(Spline1D(), x, y)
Similarly, a smoothing spline can be fit to data:
>>> fitter = fitting.SplineSmoothingFitter()
>>> spl = fitter(Spline1D(), x, y, s=0.5)
Similarly, a spline can be fit to data using an exact set of interior knots:
>>> t = [-1, 0, 1]
>>> fitter = fitting.SplineExactKnotsFitter()
>>> spl = fitter(Spline1D(), x, y, t=t)
"""
n_inputs = 1
n_outputs = 1
_separable = True
optional_inputs = {"nu": 0}
def __init__(
self,
knots=None,
coeffs=None,
degree=3,
bounds=None,
n_models=None,
model_set_axis=None,
name=None,
meta=None,
):
super().__init__(
knots=knots,
coeffs=coeffs,
degree=degree,
bounds=bounds,
n_models=n_models,
model_set_axis=model_set_axis,
name=name,
meta=meta,
)
@property
def t(self):
"""
The knots vector
"""
if self._t is None:
return np.concatenate(
(np.zeros(self._degree + 1), np.ones(self._degree + 1))
)
else:
return self._t
@t.setter
def t(self, value):
if self._t is None:
raise ValueError(
"The model parameters must be initialized before setting knots."
)
elif len(value) == len(self._t):
self._t = value
else:
raise ValueError(
"There must be exactly as many knots as previously defined."
)
@property
def t_interior(self):
"""
The interior knots
"""
return self.t[self.degree + 1 : -(self.degree + 1)]
@property
def c(self):
"""
The coefficients vector
"""
if self._c is None:
return np.zeros(len(self.t))
else:
return self._c
@c.setter
def c(self, value):
if self._c is None:
raise ValueError(
"The model parameters must be initialized before setting coeffs."
)
elif len(value) == len(self._c):
self._c = value
else:
raise ValueError(
"There must be exactly as many coeffs as previously defined."
)
@property
def degree(self):
"""
The degree of the spline polynomials
"""
return self._degree
@property
def _initialized(self):
return self._t is not None and self._c is not None
@property
def tck(self):
"""
Scipy 'tck' tuple representation
"""
return (self.t, self.c, self.degree)
@tck.setter
def tck(self, value):
if self._initialized:
if value[2] != self.degree:
raise ValueError("tck has incompatible degree!")
self.t = value[0]
self.c = value[1]
else:
self._init_spline(value[0], value[1])
# Calling this will properly fill the _parameter vector, which is
# used directly sometimes without being properly filled.
_ = self.parameters
@property
def bspline(self):
"""
Scipy bspline object representation
"""
from scipy.interpolate import BSpline
return BSpline(*self.tck)
@bspline.setter
def bspline(self, value):
from scipy.interpolate import BSpline
if isinstance(value, BSpline):
self.tck = value.tck
else:
self.tck = value
@property
def knots(self):
"""
Dictionary of knot parameters
"""
return [getattr(self, knot) for knot in self._knot_names]
@property
def user_knots(self):
"""If the knots have been supplied by the user"""
return self._user_knots
@user_knots.setter
def user_knots(self, value):
self._user_knots = value
@property
def coeffs(self):
"""
Dictionary of coefficient parameters
"""
return [getattr(self, coeff) for coeff in self._coeff_names]
def _init_parameters(self):
self._knot_names = self._create_parameters("knot", "t", fixed=True)
self._coeff_names = self._create_parameters("coeff", "c")
def _init_bounds(self, bounds=None):
if bounds is None:
bounds = [None, None]
if bounds[0] is None:
lower = np.zeros(self._degree + 1)
else:
lower = np.array([bounds[0]] * (self._degree + 1))
if bounds[1] is None:
upper = np.ones(self._degree + 1)
else:
upper = np.array([bounds[1]] * (self._degree + 1))
if bounds[0] is not None and bounds[1] is not None:
self.bounding_box = bounds
has_bounds = True
else:
has_bounds = False
return has_bounds, lower, upper
def _init_knots(self, knots, has_bounds, lower, upper):
if np.issubdtype(type(knots), np.integer):
self._t = np.concatenate((lower, np.zeros(knots), upper))
elif isiterable(knots):
self._user_knots = True
if has_bounds:
self._t = np.concatenate((lower, np.array(knots), upper))
else:
if len(knots) < 2 * (self._degree + 1):
raise ValueError(
f"Must have at least {2*(self._degree + 1)} knots."
)
self._t = np.array(knots)
else:
raise ValueError(f"Knots: {knots} must be iterable or value")
# check that knots form a viable spline
self.bspline
def _init_coeffs(self, coeffs=None):
if coeffs is None:
self._c = np.zeros(len(self._t))
else:
self._c = np.array(coeffs)
# check that coeffs form a viable spline
self.bspline
def _init_data(self, knots, coeffs, bounds=None):
self._init_knots(knots, *self._init_bounds(bounds))
self._init_coeffs(coeffs)
[docs] def evaluate(self, *args, **kwargs):
"""
Evaluate the spline.
Parameters
----------
x :
(positional) The points where the model is evaluating the spline at
nu : optional
(kwarg) The derivative of the spline for evaluation, 0 <= nu <= degree + 1.
Default: 0.
"""
kwargs = super().evaluate(*args, **kwargs)
x = args[0]
if "nu" in kwargs:
if kwargs["nu"] > self.degree + 1:
raise RuntimeError(
"Cannot evaluate a derivative of "
f"order higher than {self.degree + 1}"
)
return self.bspline(x, **kwargs)
[docs] def derivative(self, nu=1):
"""
Create a spline that is the derivative of this one
Parameters
----------
nu : int, optional
Derivative order, default is 1.
"""
if nu <= self.degree:
bspline = self.bspline.derivative(nu=nu)
derivative = Spline1D(degree=bspline.k)
derivative.bspline = bspline
return derivative
else:
raise ValueError(f"Must have nu <= {self.degree}")
[docs] def antiderivative(self, nu=1):
"""
Create a spline that is an antiderivative of this one
Parameters
----------
nu : int, optional
Antiderivative order, default is 1.
Notes
-----
Assumes constant of integration is 0
"""
if (nu + self.degree) <= 5:
bspline = self.bspline.antiderivative(nu=nu)
antiderivative = Spline1D(degree=bspline.k)
antiderivative.bspline = bspline
return antiderivative
else:
raise ValueError(
"Supported splines can have max degree 5, "
f"antiderivative degree will be {nu + self.degree}"
)
class _SplineFitter(abc.ABC):
"""
Base Spline Fitter
"""
def __init__(self):
self.fit_info = {"resid": None, "spline": None}
def _set_fit_info(self, spline):
self.fit_info["resid"] = spline.get_residual()
self.fit_info["spline"] = spline
@abc.abstractmethod
def _fit_method(self, model, x, y, **kwargs):
raise NotImplementedError("This has not been implemented for _SplineFitter.")
def __call__(self, model, x, y, z=None, **kwargs):
model_copy = model.copy()
if isinstance(model_copy, Spline1D):
if z is not None:
raise ValueError("1D model can only have 2 data points.")
spline = self._fit_method(model_copy, x, y, **kwargs)
else:
raise ModelDefinitionError(
"Only spline models are compatible with this fitter."
)
self._set_fit_info(spline)
return model_copy
[docs]class SplineInterpolateFitter(_SplineFitter):
"""
Fit an interpolating spline
"""
def _fit_method(self, model, x, y, **kwargs):
weights = kwargs.pop("weights", None)
bbox = kwargs.pop("bbox", [None, None])
if model.user_knots:
warnings.warn(
"The current user specified knots maybe ignored for interpolating data",
AstropyUserWarning,
)
model.user_knots = False
if bbox != [None, None]:
model.bounding_box = bbox
from scipy.interpolate import InterpolatedUnivariateSpline
spline = InterpolatedUnivariateSpline(
x, y, w=weights, bbox=bbox, k=model.degree
)
model.tck = spline._eval_args
return spline
[docs]class SplineSmoothingFitter(_SplineFitter):
"""
Fit a smoothing spline
"""
def _fit_method(self, model, x, y, **kwargs):
s = kwargs.pop("s", None)
weights = kwargs.pop("weights", None)
bbox = kwargs.pop("bbox", [None, None])
if model.user_knots:
warnings.warn(
"The current user specified knots maybe ignored for smoothing data",
AstropyUserWarning,
)
model.user_knots = False
if bbox != [None, None]:
model.bounding_box = bbox
from scipy.interpolate import UnivariateSpline
spline = UnivariateSpline(x, y, w=weights, bbox=bbox, k=model.degree, s=s)
model.tck = spline._eval_args
return spline
[docs]class SplineExactKnotsFitter(_SplineFitter):
"""
Fit a spline using least-squares regression.
"""
def _fit_method(self, model, x, y, **kwargs):
t = kwargs.pop("t", None)
weights = kwargs.pop("weights", None)
bbox = kwargs.pop("bbox", [None, None])
if t is not None:
if model.user_knots:
warnings.warn(
"The current user specified knots will be "
"overwritten for by knots passed into this function",
AstropyUserWarning,
)
else:
if model.user_knots:
t = model.t_interior
else:
raise RuntimeError("No knots have been provided")
if bbox != [None, None]:
model.bounding_box = bbox
from scipy.interpolate import LSQUnivariateSpline
spline = LSQUnivariateSpline(x, y, t, w=weights, bbox=bbox, k=model.degree)
model.tck = spline._eval_args
return spline
[docs]class SplineSplrepFitter(_SplineFitter):
"""
Fit a spline using the `scipy.interpolate.splrep` function interface.
"""
def __init__(self):
super().__init__()
self.fit_info = {"fp": None, "ier": None, "msg": None}
def _fit_method(self, model, x, y, **kwargs):
t = kwargs.pop("t", None)
s = kwargs.pop("s", None)
task = kwargs.pop("task", 0)
weights = kwargs.pop("weights", None)
bbox = kwargs.pop("bbox", [None, None])
if t is not None:
if model.user_knots:
warnings.warn(
"The current user specified knots will be "
"overwritten for by knots passed into this function",
AstropyUserWarning,
)
else:
if model.user_knots:
t = model.t_interior
if bbox != [None, None]:
model.bounding_box = bbox
from scipy.interpolate import splrep
tck, fp, ier, msg = splrep(
x,
y,
w=weights,
xb=bbox[0],
xe=bbox[1],
k=model.degree,
s=s,
t=t,
task=task,
full_output=1,
)
model.tck = tck
return fp, ier, msg
def _set_fit_info(self, spline):
self.fit_info["fp"] = spline[0]
self.fit_info["ier"] = spline[1]
self.fit_info["msg"] = spline[2]