#!/usr/bin/env python
#-*- coding: utf-8 -*-
"""Generic curve fitting module."""
# System modules
from collections import Counter
from copy import deepcopy
from os import (
cpu_count,
mkdir,
path,
)
from re import sub
from shutil import rmtree
from sys import float_info
#from time import time
# Third party modules
try:
from joblib import (
Parallel,
delayed,
)
HAVE_JOBLIB = True
except ImportError:
HAVE_JOBLIB = False
from nexusformat.nexus import NXdata
import numpy as np
# Local modules
from CHAP.processor import Processor
from CHAP.utils.general import (
# is_int,
is_index,
index_nearest,
quick_plot,
)
FLOAT_MIN = float_info.min
FLOAT_MAX = float_info.max
FLOAT_EPS = float_info.epsilon
# sigma = fwhm_factor*fwhm
fwhm_factor = {
'gaussian': 'fwhm/(2*sqrt(2*log(2)))',
'lorentzian': '0.5*fwhm',
'splitlorentzian': '0.5*fwhm', # sigma = sigma_r
'voight': '0.2776*fwhm', # sigma = gamma
'pvoigt': '0.5*fwhm', # fraction = 0.5
}
# amplitude = height_factor*height*fwhm
height_factor = {
'gaussian': 'height*fwhm*0.5*sqrt(pi/log(2))',
'lorentzian': 'height*fwhm*0.5*pi',
'splitlorentzian': 'height*fwhm*0.5*pi', # sigma = sigma_r
'voight': '3.334*height*fwhm', # sigma = gamma
'pvoigt': '1.268*height*fwhm', # fraction = 0.5
}
[docs]
class FitProcessor(Processor):
"""A processor to perform a fit on a data set or data map. """
[docs]
def process(self, data, config=None):
"""Fit the data and return a :class:`~CHAP.utils.fit.Fit` or
:class:`~CHAP.utils.fit.FitMap` object depending on the
dimensionality of the input data. The input data should be or
contain a NeXus style
`NXdata <https://manual.nexusformat.org/classes/base_classes/NXdata.html#index-0>`__
object, with properly defined signal and axis, or a
:class:`~CHAP.utils.fit.Fit` or :class:`~CHAP.utils.fit.FitMap`
object from a previous fit.
:param data: Input data containing the
nexusformat.nexus.NXdata object to fit.
:type data: list[PipelineData] or Fit or FitMap or
nexusformat.nexus.NXdata
:param config: Fit configuration.
:type config: dict, optional
:raises ValueError: Invalid input or configuration parameter.
:return: The fitted data object.
:rtype: Fit or FitMap
"""
# Local modules
from CHAP.utils.models import (
FitConfig,
Multipeak,
)
# Unwrap the PipelineData if called as a Pipeline Processor
if (not isinstance(data, (Fit, FitMap))
and not isinstance(data, NXdata)):
data = self.get_pipelinedata_item(data)
# Get the validated fit configuration
fit_config = None
if config is not None:
try:
fit_config = FitConfig(**config)
except Exception as exc:
raise RuntimeError from exc
if isinstance(data, (Fit, FitMap)):
# Refit/continue the fit with possibly updated parameters
fit = data
if isinstance(data, FitMap):
fit.fit(config=fit_config)
else:
fit.fit(config=fit_config)
if fit_config is not None:
if fit_config.print_report:
fit.print_fit_report()
if fit_config.plot:
fit.plot(skip_init=True)
elif isinstance(data, NXdata):
# Get the default NXdata object
try:
nxdata = data.get_default()
assert nxdata is not None
except (AssertionError, ValueError) as exc:
if nxdata is None or nxdata.nxclass != 'NXdata':
raise ValueError(
'Invalid default pathway to an NXdata '
f'object in ({data})') from exc
# Expand multipeak model if present
found_multipeak = False
for i, model in enumerate(deepcopy(fit_config.models)):
if isinstance(model, Multipeak):
if found_multipeak:
raise ValueError(
f'Invalid parameter models ({fit_config.models}) '
'(multiple instances of multipeak not allowed)')
parameters, models = self.create_multipeak_model(model)
if parameters:
fit_config.parameters += parameters
fit_config.models += models
fit_config.models.pop(i)
found_multipeak = True
# Instantiate the Fit or FitMap object and fit the data
if np.squeeze(nxdata.nxsignal).ndim == 1:
fit = Fit(nxdata, fit_config, self.logger)
fit.fit()
if fit_config.print_report:
fit.print_fit_report()
if fit_config.plot:
fit.plot(skip_init=True)
else:
fit = FitMap(nxdata, fit_config, self.logger)
fit.fit(
rel_height_cutoff=fit_config.rel_height_cutoff,
num_proc=fit_config.num_proc, plot=fit_config.plot,
print_report=fit_config.print_report)
else:
raise ValueError(f'Invalid input data ({type(data)}: {data})')
return fit
[docs]
@staticmethod
def create_multipeak_model(model_config):
"""Create a multipeak model.
:param model_config: A Multipeak fit model class.
:type model_config: :class:`~CHAP.utils.models.Multipeak`
:return: The fit parameters and fit model classes.
:rtype: list[:attr:`~CHAP.utils.models.FitParameter`],
list[:attr:`~CHAP.utils.models.FitConfig.models`]
"""
# Local modules
from CHAP.utils.models import (
FitParameter,
models,
)
peak_model_name = model_config.peak_models
peak_model_class = models[peak_model_name]['class']
parameters = []
peak_models = []
num_peak = len(model_config.centers)
if num_peak == 1 and model_config.fit_type == 'uniform':
model_config.fit_type = 'unconstrained'
sig_min = FLOAT_MIN
sig_max = np.inf
if (model_config.fwhm_min is not None
or model_config.fwhm_max is not None):
# Third party modules
from asteval import Interpreter
ast = Interpreter()
if model_config.fwhm_min is not None:
ast(f'fwhm = {model_config.fwhm_min}')
sig_min = ast(fwhm_factor[model_config.peak_models])
if model_config.fwhm_max is not None:
ast(f'fwhm = {model_config.fwhm_max}')
sig_max = ast(fwhm_factor[model_config.peak_models])
prefix = ''
if model_config.fit_type == 'uniform':
parameters.append(FitParameter(
name='scale_factor', value=1.0, min=FLOAT_MIN))
for i, cen in enumerate(model_config.centers):
if num_peak > 1:
prefix = f'peak{i+1}_'
peak_models.append(peak_model_class(
model=peak_model_name,
prefix=prefix,
parameters=[
{'name': 'amplitude', 'min': FLOAT_MIN},
{'name': 'center', 'expr': f'scale_factor*{cen}'},
{'name': 'sigma', 'min': sig_min, 'max': sig_max}]))
else:
for i, cen in enumerate(model_config.centers):
if num_peak > 1:
prefix = f'peak{i+1}_'
if model_config.centers_range == 0:
peak_models.append(peak_model_class(
model=peak_model_name,
prefix=prefix,
parameters=[
{'name': 'amplitude', 'min': FLOAT_MIN},
{'name': 'center', 'value': cen, 'vary': False},
{'name': 'sigma', 'min': sig_min, 'max': sig_max}
]))
else:
if model_config.centers_range is None:
cen_min = None
cen_max = None
else:
cen_min = cen - model_config.centers_range
cen_max = cen + model_config.centers_range
peak_models.append(peak_model_class(
model=peak_model_name,
prefix=prefix,
parameters=[
{'name': 'amplitude', 'min': FLOAT_MIN},
{'name': 'center', 'value': cen, 'min': cen_min,
'max': cen_max},
{'name': 'sigma', 'min': sig_min, 'max': sig_max}
]))
return parameters, peak_models
[docs]
class Component():
"""A model fit component."""
def __init__(self, model, prefix=''):
"""Initialize Component.
:param model: A fit model class.
:type model: :attr:`~CHAP.utils.models.FitConfig.models`
:param prefix: Model prefix.
:type prefix: str, optional
"""
# Local modules
from CHAP.utils.models import models
self.func = models[model.model]['name']
self.param_names = [f'{prefix}{par.name}' for par in model.parameters]
self.prefix = prefix
self._name = model.model
[docs]
class Components(dict):
"""The dictionary of model fit components."""
def __init__(self):
"""Initialize Components."""
super().__init__(self)
def __setitem__(self, key, value):
if key not in self and not isinstance(key, str):
raise KeyError(f'Invalid component name ({key})')
if not isinstance(value, Component):
raise ValueError(f'Invalid component ({value})')
dict.__setitem__(self, key, value)
value.name = key
@property
def components(self):
"""Return the model fit components.
:type: list[:attr:`~CHAP.utils.models.FitConfig.models`]
"""
return self.values()
[docs]
def add(self, model, prefix=''):
"""Add a model to the model fit components dictionary.
:param model: A fit model class.
:type model: :attr:`~CHAP.utils.models.FitConfig.models`
:param prefix: Model prefix.
:type prefix: str, optional
"""
# Local modules
from CHAP.utils.models import model_classes
if not isinstance(model, model_classes):
raise ValueError(f'Invalid parameter model ({model})')
if not isinstance(prefix, str):
raise ValueError(f'Invalid parameter prefix ({prefix})')
name = f'{prefix}{model.model}'
self.__setitem__(name, Component(model, prefix))
[docs]
class Parameters(dict):
"""A dictionary of FitParameter objects, mimicking the
functionality of a similarly named
`class in the lmfit library <https://lmfit.github.io/lmfit-py/parameters.html#lmfit.parameter.Parameters>`__.
"""
def __init__(self):
"""Initialize Parameters."""
super().__init__(self)
def __setitem__(self, key, value):
# Local modules
from CHAP.utils.models import FitParameter
if key in self:
raise KeyError(f'Duplicate name for FitParameter ({key})')
if key not in self and not isinstance(key, str):
raise KeyError(f'Invalid FitParameter name ({key})')
if value is not None and not isinstance(value, FitParameter):
raise ValueError(f'Invalid FitParameter ({value})')
dict.__setitem__(self, key, value)
value.name = key
[docs]
def add(self, parameter, prefix=''):
"""Add a fit parameter.
:param parameter: The fit parameter to add to the dictionary.
:type parameter: str or FitParameter
:param prefix: Prefix of the model to which this parameter
belongs, defaults to `''`.
:type prefix: str, optional
"""
# Local modules
from CHAP.utils.models import FitParameter
if isinstance(parameter, FitParameter):
name = f'{prefix}{parameter.name}'
self.__setitem__(name, parameter)
else:
raise RuntimeError('Must test')
parameter = f'{prefix}{parameter}'
self.__setitem__(
parameter,
FitParameter(name=parameter))
setattr(self[parameter.name], '_prefix', prefix)
[docs]
class ModelResult():
"""The result of a model fit, mimicking the functionality of a
similarly named
`class in the lmfit library <https://lmfit.github.io/lmfit-py/model.html#lmfit.model.ModelResult>`__.
"""
def __init__(
self, model, parameters, x=None, y=None, method=None, ast=None,
res_par_exprs=None, res_par_indices=None, res_par_names=None,
result=None):
"""Initialize SetNumexprThreads.
:param model: Fit model.
:type model: Components or lmfit.model.Model
:param parameters: Fit parameters.
:type parameters: Parameters or lmfit.parameter.Parameter
:param x: x-coordinates.
:type x: array-like, optional
:param y: y-coordinates.
:type y: array-like, optional
:param method: Minimization method name.
:type method: str, optional
:param ast:
`Asteval <https://lmfit.github.io/asteval/>`
`Interpreter <https://lmfit.github.io/asteval/api.html#the-interpreter-class>`__.
:type ast: asteval.Interpreter, optional
:param res_par_exprs: The expression parameter expressions.
:type res_par_exprs: list[dict], optional
:param res_par_indices: The parameter indices of all free fit
parameters in the list of fit parameters.
:type res_par_indices: list[int], optional
:param res_par_names: The parameter names of all free fit
parameters in the list of fit parameters.
:type res_par_names: list[str], optional
"""
self.components = model.components
self.params = deepcopy(parameters)
if x is None:
self.success = False
return
if method == 'leastsq':
best_pars = result[0]
self.ier = result[4]
self.message = result[3]
self.nfev = result[2]['nfev']
self.residual = result[2]['fvec']
self.success = 1 <= result[4] <= 4
else:
best_pars = result.x
self.ier = result.status
self.message = result.message
self.nfev = result.nfev
self.residual = result.fun
self.success = result.success
self.best_fit = y + self.residual
self.method = method
self.ndata = len(self.residual)
self.nvarys = len(res_par_indices)
self.x = x
self._ast = ast
self._expr_pars = {}
# Get the covarience matrix
self.chisqr = (self.residual**2).sum()
self.redchi = self.chisqr / (self.ndata-self.nvarys)
self.covar = None
if method == 'leastsq':
if result[1] is not None:
self.covar = result[1]*self.redchi
else:
try:
self.covar = self.redchi * np.linalg.inv(
np.dot(result.jac.T, result.jac))
except Exception:
self.covar = None
# Update the fit parameters with the fit result
par_names = list(self.params.keys())
self.var_names = []
for i, (value, index) in enumerate(zip(best_pars, res_par_indices)):
par = self.params[par_names[index]]
par.set(value=value)
stderr = None
if self.covar is not None:
stderr = self.covar[i,i]
if stderr is not None:
if stderr < 0.0:
stderr = None
else:
stderr = np.sqrt(stderr)
self.var_names.append(par.name)
if res_par_exprs:
# Third party modules
from sympy import diff
for value, name in zip(best_pars, res_par_names):
self._ast.symtable[name] = value
for par_expr in res_par_exprs:
name = par_names[par_expr['index']]
expr = par_expr['expr']
par = self.params[name]
par.set(value=self._ast.eval(expr))
self._expr_pars[name] = expr
stderr = None
if self.covar is not None:
stderr = 0
for i, name in enumerate(self.var_names):
d = diff(expr, name)
if not d:
continue
for ii, nname in enumerate(self.var_names):
dd = diff(expr, nname)
if not dd:
continue
stderr += (self._ast.eval(str(d))
* self._ast.eval(str(dd))
* self.covar[i,ii])
stderr = np.sqrt(stderr)
setattr(par, '_stderr', stderr)
[docs]
def eval_components(self, x=None, parameters=None):
"""Evaluate each component of a composite model function.
:param x: Independent variable, defaults to `None`, in which
case the class variable x is used.
:type x: array-like, optional
:param parameters: Composite model parameters, defaults to
`None`, in which case the class variable params is used.
:type parameters: Parameters or lmfit.parameter.Parameter,
optional
:return: A dictionary with component name and evaluated
function values key, value pairs.
:rtype: dict
"""
if x is None:
x = self.x
if parameters is None:
parameters = self.params
result = {}
for component in self.components:
if 'tmp_normalization_offset_c' in component.param_names:
continue
par_values = tuple(
parameters[par].value for par in component.param_names)
if component.prefix == '':
name = component._name
else:
name = component.prefix
result[name] = component.func(x, *par_values)
return result
[docs]
def fit_report(self, show_correl=False):
"""Generates a report of the fitting results with their best
parameter values and uncertainties.
:param show_correl: Whether to show list of correlations,
defaults to `False`.
:type show_correl: bool, optional
"""
# FIX add show_correl option
# Local modules
from CHAP.utils.general import (
getfloat_attr,
gformat,
)
buff = []
add = buff.append
parnames = list(self.params.keys())
namelen = max(len(n) for n in parnames)
add("[[Fit Statistics]]")
add(f" # fitting method = {self.method}")
add(f" # function evals = {getfloat_attr(self, 'nfev')}")
add(f" # data points = {getfloat_attr(self, 'ndata')}")
add(f" # variables = {getfloat_attr(self, 'nvarys')}")
add(f" chi-square = {getfloat_attr(self, 'chisqr')}")
add(f" reduced chi-square = {getfloat_attr(self, 'redchi')}")
# add(f" Akaike info crit = {getfloat_attr(self, 'aic')}")
# add(f" Bayesian info crit = {getfloat_attr(self, 'bic')}")
# if hasattr(self, 'rsquared'):
# add(f" R-squared = {getfloat_attr(self, 'rsquared')}")
add("[[Variables]]")
for name in parnames:
par = self.params[name]
space = ' '*(namelen-len(name))
nout = f'{name}:{space}'
inval = '(init = ?)'
if par.init_value is not None:
inval = f'(init = {par.init_value:.7g})'
expr = self._expr_pars.get(name, par.expr)
if expr is not None:
val = self._ast.eval(expr)
else:
val = par.value
try:
val = gformat(par.value)
except (TypeError, ValueError):
val = ' Non Numeric Value?'
if par.stderr is not None:
serr = gformat(par.stderr)
try:
spercent = f'({abs(par.stderr/par.value):.2%})'
except ZeroDivisionError:
spercent = ''
val = f'{val} +/-{serr} {spercent}'
if par.vary:
add(f' {nout} {val} {inval}')
elif expr is not None:
add(f" {nout} {val} == '{expr}'")
else:
add(f' {nout} {par.value:.7g} (fixed)')
return '\n'.join(buff)
[docs]
class Fit:
"""Wrapper class for scipy/lmfit."""
def __init__(self, nxdata, config, logger):
"""Initialize Fit.
:param nxdata: The input data.
:type nxdata: nexusformat.nexus.NXdata
:param config: Fit configuration.
:type config: CHAP.utils.models.FitConfig, optional
:param logger: A python Logger object.
:type logger: logging.Logger
"""
self._code = config.code
for model in config.models:
if model.model == 'expression' and self._code != 'lmfit':
self._code = 'lmfit'
logger.warning('Using lmfit instead of scipy with '
'an expression model')
if self._code == 'scipy':
# Local modules
from CHAP.utils.fit import Parameters
else:
# Third party modules
from lmfit import Parameters
self._logger = logger
self._mask = None
self._method = config.method
self._model = None
self._norm = None
self._normalized = False
self._free_parameters = []
self._parameters = Parameters()
if self._code == 'scipy':
self._ast = None
self._res_num_pars = []
self._res_par_exprs = []
self._res_par_indices = []
self._res_par_names = []
self._res_par_values = []
self._parameter_bounds = None
self._linear_parameters = []
self._nonlinear_parameters = []
self._result = None
# self._try_linear_fit = True
# self._fwhm_min = None
# self._fwhm_max = None
# self._sigma_min = None
# self._sigma_max = None
self._x = None
self._y = None
self._y_norm = None
self._y_range = None
# if 'try_linear_fit' in kwargs:
# self._try_linear_fit = kwargs.pop('try_linear_fit')
# if not isinstance(self._try_linear_fit, bool):
# raise ValueError(
# 'Invalid value of keyword argument try_linear_fit '
# f'({self._try_linear_fit})')
if nxdata is not None:
if isinstance(nxdata.attrs['axes'], str):
dim_x = nxdata.attrs['axes']
else:
dim_x = nxdata.attrs['axes'][-1]
self._x = np.asarray(nxdata[dim_x])
self._y = np.squeeze(nxdata.nxsignal)
if self._x.ndim != 1:
raise ValueError(
f'Invalid x dimension ({self._x.ndim})')
if self._x.size != self._y.size:
raise ValueError(
f'Inconsistent x and y dimensions ({self._x.size} vs '
f'{self._y.size})')
# if 'mask' in kwargs:
# self._mask = kwargs.pop('mask')
if True: #self._mask is None:
y_min = float(self._y.min())
self._y_range = float(self._y.max())-y_min
if self._y_range > 0.0:
self._norm = (y_min, self._y_range)
# else:
# self._mask = np.asarray(self._mask).astype(bool)
# if self._x.size != self._mask.size:
# raise ValueError(
# f'Inconsistent x and mask dimensions ({self._x.size} '
# f'vs {self._mask.size})')
# y_masked = np.asarray(self._y)[~self._mask]
# y_min = float(y_masked.min())
# self._y_range = float(y_masked.max())-y_min
# if self._y_range > 0.0:
# self._norm = (y_min, self._y_range)
# Setup fit model
self._setup_fit_model(config.parameters, config.models)
@property
def best_errors(self):
"""Return errors in the best fit parameters.
:type: dict
"""
if self._result is None:
return None
return {name:self._result.params[name].stderr
for name in sorted(self._result.params)
if name != 'tmp_normalization_offset_c'}
@property
def best_fit(self):
"""Return the best fit.
:type: numpy.ndarray
"""
if self._result is None:
return None
return self._result.best_fit
@property
def best_parameters(self):
"""Return the best fit parameters.
:type: dict
"""
if self._result is None:
return None
parameters = {}
for name in sorted(self._result.params):
if name != 'tmp_normalization_offset_c':
par = self._result.params[name]
parameters[name] = {
'value': par.value,
'error': par.stderr,
'init_value': par.init_value,
'min': par.min,
'max': par.max,
'vary': par.vary, 'expr': par.expr
}
return parameters
@property
def best_values(self):
"""Return values for the best fit parameters.
:type: dict
"""
if self._result is None:
return None
return {name:self._result.params[name].value
for name in sorted(self._result.params)
if name != 'tmp_normalization_offset_c'}
@property
def best_vary(self):
"""Return vary parameters for the best fit parameters.
:type: dict
"""
if self._result is None:
return None
return {name:self._result.params[name].vary
for name in sorted(self._result.params)
if name != 'tmp_normalization_offset_c'}
@property
def chisqr(self):
"""Return chisqr values for the best fit parameters.
:type: numpy.ndarray
"""
if self._result is None:
return None
return self._result.chisqr
@property
def components(self):
"""Return fit model components info.
:type: dict
"""
# Third party modules
from lmfit.models import ExpressionModel
components = {}
if self._result is None:
self._logger.warning(
'Unable to collect components in Fit.components')
return components
for component in self._result.components:
if 'tmp_normalization_offset_c' in component.param_names:
continue
parameters = {}
for name in component.param_names:
par = self._parameters[name]
parameters[name] = {
'free': par.vary,
'value': self._result.params[name].value,
}
if par.expr is not None:
parameters[name]['expr'] = par.expr
expr = None
if isinstance(component, ExpressionModel):
name = component._name
if name[-1] == '_':
name = name[:-1]
expr = component.expr
else:
prefix = component.prefix
if prefix:
if prefix[-1] == '_':
prefix = prefix[:-1]
name = f'{prefix} ({component._name})'
else:
name = f'{component._name}'
if expr is None:
components[name] = {
'parameters': parameters,
}
else:
components[name] = {
'expr': expr,
'parameters': parameters,
}
return components
@property
def covar(self):
"""Return the covarience matrix for the best fit parameters.
:type: numpy.ndarray
"""
if self._result is None:
return None
return self._result.covar
@property
def init_parameters(self):
"""Return the initial parameters for the fit model.
:type: dict
"""
if self._result is None or self._result.init_params is None:
return None
parameters = {}
for name in sorted(self._result.init_params):
if name != 'tmp_normalization_offset_c':
par = self._result.init_params[name]
parameters[name] = {
'value': par.value,
'min': par.min,
'max': par.max,
'vary': par.vary,
'expr': par.expr,
}
return parameters
@property
def init_values(self):
"""Return the initial values for the fit parameters.
:type: dict
"""
if self._result is None or self._result.init_params is None:
return None
return {name:self._result.init_params[name].value
for name in sorted(self._result.init_params)
if name != 'tmp_normalization_offset_c'}
@property
def normalization_offset(self):
"""Return the `normalization_offset` value for the fit model.
:type: float
"""
if self._result is None:
return None
if self._norm is None:
return 0.0
if self._result.init_params is not None:
normalization_offset = float(
self._result.init_params['tmp_normalization_offset_c'].value)
else:
normalization_offset = float(
self._result.params['tmp_normalization_offset_c'].value)
return normalization_offset
@property
def num_func_eval(self):
"""Return the number of function evaluations for the best fit.
:type: int
"""
if self._result is None:
return None
return self._result.nfev
@property
def parameters(self):
"""Return the fit parameters info.
:type: dict
"""
return {name:{'min': par.min, 'max': par.max, 'vary': par.vary,
'expr': par.expr} for name, par in self._parameters.items()
if name != 'tmp_normalization_offset_c'}
@property
def redchi(self):
"""Return redchi for the best fit.
:type: dict
"""
if self._result is None:
return None
return self._result.redchi
@property
def residual(self):
"""Return the residual in the best fit.
:type: numpy.ndarray
"""
if self._result is None:
return None
# lmfit return the negative of the residual in its common
# definition as (data - fit)
return -self._result.residual
@property
def success(self):
"""Return the success value for the fit.
:type: bool
"""
if self._result is None:
return None
if not self._result.success:
self._logger.warning(
f'ier = {self._result.ier}: {self._result.message}')
if (self._code == 'lmfit' and self._result.ier
and self._result.ier != 5):
return True
return self._result.success
@property
def var_names(self):
"""Return the variable names for the covarience matrix
property.
:type: list[str]
"""
if self._result is None:
return None
return getattr(self._result, 'var_names', None)
@property
def x(self):
"""Return the input x-coordinates.
:type: numpy.ndarray
"""
return self._x
@property
def y(self):
"""Return the input y-coordinates.
:type: numpy.ndarray
"""
return self._y
[docs]
def print_fit_report(self, result=None, show_correl=False):
"""Print a fit report.
:param result: Model result, defaults to the `self._result`
class attribute.
:type result: ModelResult or lmfit.model.ModelResult, optional
:param show_correl: Whether to show list of correlations,
defaults to `False`.
:type show_correl: bool, optional
"""
if result is None:
result = self._result
if result is not None:
print(result.fit_report(show_correl=show_correl))
[docs]
def add_parameter(self, parameter):
"""Add a fit parameter to the fit model.
:param parameter: A new parameter to be added to the fit model.
:type parameter: FitParameter
"""
# Local modules
from CHAP.utils.models import FitParameter
if parameter.get('expr') is not None:
raise KeyError(f'Invalid "expr" key in parameter {parameter}')
name = parameter['name']
if not parameter['vary']:
self._logger.warning(
f'Ignoring min in parameter {name} in '
f'Fit.add_parameter (vary = {parameter["vary"]})')
parameter['min'] = -np.inf
self._logger.warning(
f'Ignoring max in parameter {name} in '
f'Fit.add_parameter (vary = {parameter["vary"]})')
parameter['max'] = np.inf
if self._code == 'scipy':
self._parameters.add(FitParameter(**parameter))
else:
self._parameters.add(**parameter)
self._free_parameters.append(name)
[docs]
def add_model(self, model, prefix):
"""Add a model component to the fit model.
:param model: A fit model class.
:type model: :attr:`~CHAP.utils.models.FitConfig.models`
:param prefix: Model prefix.
:type prefix: str
"""
# pylint: disable=possibly-used-before-assignment
if self._code == 'lmfit':
from lmfit.models import (
ConstantModel,
LinearModel,
QuadraticModel,
# PolynomialModel,
ExponentialModel,
GaussianModel,
LorentzianModel,
PseudoVoigtModel,
ExpressionModel,
# StepModel,
RectangleModel,
)
if model.model == 'expression':
expr = model.expr
else:
expr = None
parameters = model.parameters
model_name = model.model
if prefix is None:
pprefix = ''
else:
pprefix = prefix
if self._code == 'scipy':
new_parameters = []
for par in deepcopy(parameters):
self._parameters.add(par, pprefix)
if self._parameters[par.name].expr is None:
self._parameters[par.name].set(value=par.default)
new_parameters.append(par.name)
self._res_num_pars += [len(parameters)]
if model_name == 'constant':
# Par: c
if self._code == 'lmfit':
newmodel = ConstantModel(prefix=prefix)
self._linear_parameters.append(f'{pprefix}c')
elif model_name == 'linear':
# Par: slope, intercept
if self._code == 'lmfit':
newmodel = LinearModel(prefix=prefix)
self._linear_parameters.append(f'{pprefix}slope')
self._linear_parameters.append(f'{pprefix}intercept')
elif model_name == 'quadratic':
# Par: a, b, c
if self._code == 'lmfit':
newmodel = QuadraticModel(prefix=prefix)
self._linear_parameters.append(f'{pprefix}a')
self._linear_parameters.append(f'{pprefix}b')
self._linear_parameters.append(f'{pprefix}c')
# elif model_name == 'polynomial':
# # Par: c0, c1,..., c7
# degree = kwargs.get('degree')
# if degree is not None:
# kwargs.pop('degree')
# if degree is None or not is_int(degree, ge=0, le=7):
# raise ValueError(
# 'Invalid parameter degree for build-in step model '
# f'({degree})')
# if self._code == 'lmfit':
# newmodel = PolynomialModel(degree=degree, prefix=prefix)
# for i in range(degree+1):
# self._linear_parameters.append(f'{pprefix}c{i}')
elif model_name == 'exponential':
# Par: amplitude, decay
if self._code == 'lmfit':
newmodel = ExponentialModel(prefix=prefix)
self._linear_parameters.append(f'{pprefix}amplitude')
self._nonlinear_parameters.append(f'{pprefix}decay')
elif model_name == 'gaussian':
# Par: amplitude, center, sigma (fwhm, height)
if self._code == 'lmfit':
newmodel = GaussianModel(prefix=prefix)
# parameter norms for height and fwhm are needed to
# get correct errors
self._linear_parameters.append(f'{pprefix}amplitude')
self._nonlinear_parameters.append(f'{pprefix}center')
self._nonlinear_parameters.append(f'{pprefix}sigma')
elif model_name == 'lorentzian':
# Par: amplitude, center, sigma (fwhm, height)
if self._code == 'lmfit':
newmodel = LorentzianModel(prefix=prefix)
# parameter norms for height and fwhm are needed to
# get correct errors
self._linear_parameters.append(f'{pprefix}amplitude')
self._nonlinear_parameters.append(f'{pprefix}center')
self._nonlinear_parameters.append(f'{pprefix}sigma')
elif model_name == 'pvoigt':
# Par: amplitude, center, sigma (fwhm, height), fraction
if self._code == 'lmfit':
newmodel = PseudoVoigtModel(prefix=prefix)
# parameter norms for height and fwhm are needed to
# get correct errors
self._linear_parameters.append(f'{pprefix}amplitude')
self._linear_parameters.append(f'{pprefix}fraction')
self._nonlinear_parameters.append(f'{pprefix}center')
self._nonlinear_parameters.append(f'{pprefix}sigma')
# elif model_name == 'step':
# # Par: amplitude, center, sigma
# form = kwargs.get('form')
# if form is not None:
# kwargs.pop('form')
# if (form is None or form not in
# ('linear', 'atan', 'arctan', 'erf', 'logistic')):
# raise ValueError(
# 'Invalid parameter form for build-in step model '
# f'({form})')
# if self._code == 'lmfit':
# newmodel = StepModel(prefix=prefix, form=form)
# self._linear_parameters.append(f'{pprefix}amplitude')
# self._nonlinear_parameters.append(f'{pprefix}center')
# self._nonlinear_parameters.append(f'{pprefix}sigma')
elif model_name == 'rectangle':
# Par: amplitude, center1, center2, sigma1, sigma2
form = 'atan' #kwargs.get('form')
#if form is not None:
# kwargs.pop('form')
# RV: Implement and test other forms when needed
if (form is None or form not in
('linear', 'atan', 'arctan', 'erf', 'logistic')):
raise ValueError(
'Invalid parameter form for build-in rectangle model '
f'({form})')
if self._code == 'lmfit':
newmodel = RectangleModel(prefix=prefix, form=form)
self._linear_parameters.append(f'{pprefix}amplitude')
self._nonlinear_parameters.append(f'{pprefix}center1')
self._nonlinear_parameters.append(f'{pprefix}center2')
self._nonlinear_parameters.append(f'{pprefix}sigma1')
self._nonlinear_parameters.append(f'{pprefix}sigma2')
elif model_name == 'expression' and self._code == 'lmfit':
# Third party modules
from asteval import (
Interpreter,
get_ast_names,
)
for par in parameters:
if par.expr is not None:
raise KeyError(
f'Invalid "expr" key ({par.expr}) in '
f'parameter ({par}) for an expression model')
ast = Interpreter()
expr_parameters = [
name for name in get_ast_names(ast.parse(expr))
if (name != 'x' and name not in self._parameters
and name not in ast.symtable)]
if prefix is None:
newmodel = ExpressionModel(expr=expr)
else:
for name in expr_parameters:
expr = sub(rf'\b{name}\b', f'{prefix}{name}', expr)
expr_parameters = [
f'{prefix}{name}' for name in expr_parameters]
newmodel = ExpressionModel(expr=expr, name=model_name)
# Remove already existing names
for name in newmodel.param_names.copy():
if name not in expr_parameters:
newmodel._func_allargs.remove(name)
newmodel._param_names.remove(name)
else:
raise ValueError(f'Unknown fit model ({model_name})')
# Add the new model to the current one
if self._code == 'scipy':
if self._model is None:
self._model = Components()
self._model.add(model, prefix)
else:
if self._model is None:
self._model = newmodel
else:
self._model += newmodel
new_parameters = newmodel.make_params()
self._parameters += new_parameters
# Check linearity of expression model parameters
if self._code == 'lmfit' and isinstance(newmodel, ExpressionModel):
# Third party modules
from sympy import diff
for name in newmodel.param_names:
if not diff(newmodel.expr, name, name):
if name not in self._linear_parameters:
self._linear_parameters.append(name)
else:
if name not in self._nonlinear_parameters:
self._nonlinear_parameters.append(name)
# Scale the default initial model parameters
if self._norm is not None:
for name in new_parameters:
if name in self._linear_parameters:
par = self._parameters.get(name)
if par.expr is None:
if self._code == 'scipy':
value = par.default
else:
value = None
if value is None:
value = par.value
_min = par.min
_max = par.max
if not (model_name == 'pvoigt' and 'fraction' in name):
if value is not None:
value *= self._norm[1]
if not np.isinf(_min) and abs(_min) != FLOAT_MIN:
_min *= self._norm[1]
if not np.isinf(_max) and abs(_max) != FLOAT_MIN:
_max *= self._norm[1]
par.set(value=value, min=_min, max=_max)
# Initialize the model parameters
for parameter in deepcopy(parameters):
name = parameter.name
if name not in new_parameters:
name = pprefix+name
if name not in new_parameters:
raise ValueError(
f'Unable to match parameter {name}')
if parameter.expr is None:
self._parameters[name].set(
value=parameter.value, min=parameter.min,
max=parameter.max, vary=parameter.vary)
else:
if parameter.value is not None:
self._logger.warning(
'Ignoring input "value" for expression parameter'
f'{name} = {parameter.expr}')
if not np.isinf(parameter.min):
self._logger.warning(
'Ignoring input "min" for expression parameter'
f'{name} = {parameter.expr}')
if not np.isinf(parameter.max):
self._logger.warning(
'Ignoring input "max" for expression parameter'
f'{name} = {parameter.expr}')
self._parameters[name].set(
value=None, min=-np.inf, max=np.inf, expr=parameter.expr)
[docs]
def eval(self, x, result=None):
"""Evaluate the best fit.
:param x: x-coordinates.
:type x: array-like, optional
:param result: Model result, defaults to the `self._result`
class attribute.
:type result: ModelResult or lmfit.model.ModelResult, optional
"""
if result is None:
result = self._result
if result is None:
return None
return result.eval(x=np.asarray(x))-self.normalization_offset
[docs]
def fit(self, config=None, **kwargs):
"""Fit the model to the input data.
:param config: Fit configuration.
:type config: CHAP.utils.models.FitConfig, optional
:param kwargs: Additional key, value pairs to pass on directly
to the core fit routine.
:type kwargs: dict
"""
# Check input parameters
if self._model is None:
self._logger.error('Undefined fit model')
return None
self._mask = kwargs.pop('mask', None)
guess = kwargs.pop('guess', False)
if not isinstance(guess, bool):
raise ValueError(
f'Invalid value of keyword argument guess ({guess})')
if self._result is not None:
if guess:
self._logger.warning(
'Ignoring input parameter guess during refitting')
guess = False
# if 'try_linear_fit' in kwargs:
# raise RuntimeError('try_linear_fit needs testing')
# try_linear_fit = kwargs.pop('try_linear_fit')
# if not isinstance(try_linear_fit, bool):
# raise ValueError(
# 'Invalid value of keyword argument try_linear_fit '
# f'({try_linear_fit})')
# if not self._try_linear_fit:
# self._logger.warning(
# 'Ignore superfluous keyword argument "try_linear_fit" '
# '(not yet supported for callable models)')
# else:
# self._try_linear_fit = try_linear_fit
# Setup the fit
self._setup_fit(config, guess)
# Check if model is linear
try:
linear_model = self._check_linearity_model()
except Exception:
linear_model = False
if kwargs.get('check_only_linearity') is not None:
return linear_model
# Normalize the data and initial parameters
self._normalize()
if linear_model:
raise RuntimeError('linear solver needs testing')
# Perform a linear fit by direct matrix solution with numpy
try:
if self._mask is None:
self._fit_linear_model(self._x, self._y_norm)
else:
self._fit_linear_model(
self._x[~self._mask],
np.asarray(self._y_norm)[~self._mask])
except Exception:
linear_model = False
if not linear_model:
self._result = self._fit_nonlinear_model(
self._x, self._y_norm, **kwargs)
# Set internal parameter values to fit results upon success
if self.success:
for name, par in self._parameters.items():
if par.expr is None and par.vary:
par.set(value=self._result.params[name].value)
# Renormalize the data and results
self._renormalize()
return None
[docs]
def plot(
self, y=None, *, y_title=None, title=None, result=None,
skip_init=False, plot_comp=True, plot_comp_legends=False,
plot_residual=False, plot_masked_data=True, **kwargs):
"""Plot the best fit.
:param y: y-coordinates.
:type y: array-like, optional
:param y_title: y-axis label.
:type y_title: str, optional
:param title: Graph title.
:type title: str, optional
:param result: Model result, defaults to the `self._result`
class attribute.
:type result: ModelResult or lmfit.model.ModelResult, optional
:param skip_init: Skip plotting the initial guess, defaults
to `False`.
:type skip_init: bool, optional
:param plot_comp: Plot the individual model components,
defaults to `True`.
:type plot_comp: bool, optional
:param plot_comp_legends: Add a legend for the individual
model components, defaults to `False`.
:type plot_comp_legends: bool, optional
:param plot_residual: Plot the residual, defaults to `False`.
:type plot_residual: bool, optional
:param plot_masked_data:
:type plot_masked_data: bool, optional
:param kwargs: Additional key, value pairs to pass on directly
to the Matplotlib plot function.
:type kwargs: dict
"""
if result is None:
result = self._result
if result is None:
return
plots = []
legend = []
if self._mask is None:
mask = np.zeros(self._x.size).astype(bool)
plot_masked_data = False
else:
mask = self._mask
if y is not None:
if not isinstance(y, (tuple, list, np.ndarray)):
self._logger.warning('Ignorint invalid parameter y ({y}')
if len(y) != len(self._x):
self._logger.warning(
'Ignoring parameter y in plot (wrong dimension)')
y = None
if y is not None:
if y_title is None or not isinstance(y_title, str):
y_title = 'data'
plots += [(self._x, y, '.')]
legend += [y_title]
if self._y is not None:
plots += [(self._x, np.asarray(self._y), 'b.')]
legend += ['data']
if plot_masked_data:
plots += [(self._x[mask], np.asarray(self._y)[mask], 'bx')]
legend += ['masked data']
if isinstance(plot_residual, bool) and plot_residual:
plots += [(self._x[~mask], result.residual, 'r-')]
legend += ['residual']
plots += [(self._x[~mask], result.best_fit, 'k-')]
legend += ['best fit']
if not skip_init and hasattr(result, 'init_fit'):
plots += [(self._x[~mask], result.init_fit, 'g-')]
legend += ['init']
if plot_comp:
components = result.eval_components(x=self._x[~mask])
num_components = len(components)
if 'tmp_normalization_offset_' in components:
num_components -= 1
if num_components > 1:
eval_index = 0
for modelname, y_comp in components.items():
if modelname == 'tmp_normalization_offset_':
continue
if modelname == '_eval':
modelname = f'eval{eval_index}'
if len(modelname) > 20:
modelname = f'{modelname[0:16]} ...'
if isinstance(y_comp, (int, float)):
y_comp *= np.ones(self._x[~mask].size)
plots += [(self._x[~mask], y_comp, '--')]
if plot_comp_legends:
if modelname[-1] == '_':
legend.append(modelname[:-1])
else:
legend.append(modelname)
quick_plot(
tuple(plots), legend=legend, title=title, block=True, **kwargs)
[docs]
@staticmethod
def guess_init_peak(
x, y, *args, center_guess=None, use_max_for_center=True):
"""Return a guess for the initial height, center and fwhm for a
single peak.
"""
center_guesses = None
x = np.asarray(x)
y = np.asarray(y)
if len(x) != len(y):
print(
f'Invalid x and y lengths ({len(x)}, {len(y)}), '
'skip initial guess')
return None, None, None
if isinstance(center_guess, (int, float)):
if args:
print(
'Ignoring additional arguments for single center_guess '
'value')
elif isinstance(center_guess, (tuple, list, np.ndarray)):
if len(center_guess) == 1:
print(
'Ignoring additional arguments for single center_guess '
'value')
if not isinstance(center_guess[0], (int, float)):
raise ValueError(
'Invalid parameter center_guess '
f'({type(center_guess[0])})')
center_guess = center_guess[0]
else:
if len(args) != 1:
raise ValueError(
f'Invalid number of arguments ({len(args)})')
n = args[0]
if not is_index(n, 0, len(center_guess)):
raise ValueError('Invalid argument')
center_guesses = center_guess
center_guess = center_guesses[n]
elif center_guess is not None:
raise ValueError(
f'Invalid center_guess type ({type(center_guess)})')
# Sort the inputs
index = np.argsort(x)
x = x[index]
y = y[index]
miny = y.min()
# Set range for current peak
if center_guesses is not None:
if len(center_guesses) > 1:
index = np.argsort(center_guesses)
n = list(index).index(n)
center_guesses = np.asarray(center_guesses)[index]
if n == 0:
low = 0
upp = index_nearest(
x, (center_guesses[0]+center_guesses[1]) / 2)
elif n == len(center_guesses)-1:
low = index_nearest(
x, (center_guesses[n-1]+center_guesses[n]) / 2)
upp = len(x)
else:
low = index_nearest(
x, (center_guesses[n-1]+center_guesses[n]) / 2)
upp = index_nearest(
x, (center_guesses[n]+center_guesses[n+1]) / 2)
x = x[low:upp]
y = y[low:upp]
# Estimate FWHM
maxy = y.max()
if center_guess is None:
center_index = np.argmax(y)
center = x[center_index]
height = maxy-miny
else:
if use_max_for_center:
center_index = np.argmax(y)
center = x[center_index]
if center_index < 0.1*len(x) or center_index > 0.9*len(x):
center_index = index_nearest(x, center_guess)
center = center_guess
else:
center_index = index_nearest(x, center_guess)
center = center_guess
height = y[center_index]-miny
half_height = miny + 0.5*height
fwhm_index1 = 0
for i in range(center_index, fwhm_index1, -1):
if y[i] < half_height:
fwhm_index1 = i
break
fwhm_index2 = len(x)-1
for i in range(center_index, fwhm_index2):
if y[i] < half_height:
fwhm_index2 = i
break
if fwhm_index1 == 0 and fwhm_index2 < len(x)-1:
fwhm = 2 * (x[fwhm_index2]-center)
elif fwhm_index1 > 0 and fwhm_index2 == len(x)-1:
fwhm = 2 * (center-x[fwhm_index1])
else:
fwhm = x[fwhm_index2]-x[fwhm_index1]
if center_guess is not None and not use_max_for_center:
index = fwhm_index1+np.argmax(y[fwhm_index1:fwhm_index2])
center = x[index]
height = y[index]-miny
return height, center, fwhm
def _create_prefixes(self, models):
"""Create model prefixes."""
# Check for duplicate model names and create prefixes
names = []
prefixes = []
for model in models:
names.append(f'{model.prefix}{model.model}')
prefixes.append(model.prefix)
counts = Counter(names)
for model, count in counts.items():
if count > 1:
n = 0
for i, name in enumerate(names):
if name == model:
n += 1
prefixes[i] = f'{name}{n}_'
return prefixes
def _setup_fit_model(self, parameters, models):
"""Setup the fit model."""
# Check for duplicate model names and create prefixes
prefixes = self._create_prefixes(models)
# Add the free fit parameters
for par in parameters:
self.add_parameter(par.model_dump())
# Add the model functions
for prefix, model in zip(prefixes, models):
self.add_model(model, prefix)
# Check linearity of free fit parameters:
known_parameters = (
self._linear_parameters + self._nonlinear_parameters)
for name in reversed(self._parameters):
if name not in known_parameters:
for nname, par in self._parameters.items():
if par.expr is not None:
# Third party modules
from sympy import diff
if 'fraction' in par.expr:
expr = par.expr.replace('fraction', 'fraction_')
else:
expr = par.expr
if nname in self._nonlinear_parameters:
self._nonlinear_parameters.insert(0, name)
elif diff(expr, name, name):
self._nonlinear_parameters.insert(0, name)
else:
self._linear_parameters.insert(0, name)
def _setup_fit(self, config, guess=False):
"""Setup the fit."""
# Apply mask if supplied:
if self._mask is not None:
raise RuntimeError('mask needs testing')
self._mask = np.asarray(self._mask).astype(bool)
if self._x.size != self._mask.size:
raise ValueError(
f'Inconsistent x and mask dimensions ({self._x.size} vs '
f'{self._mask.size})')
# Estimate initial parameters
if guess and not isinstance(self, FitMap):
raise RuntimeError('Estimate initial parameters needs testing')
if self._mask is None:
xx = self._x
yy = self._y
else:
xx = self._x[~self._mask]
yy = np.asarray(self._y)[~self._mask]
try:
# Try with the build-in lmfit guess method
# (only implemented for a single model)
self._parameters = self._model.guess(yy, x=xx)
except Exception:
# Third party modules
from asteval import Interpreter
from lmfit.models import GaussianModel
ast = Interpreter()
# Should work for other peak-like models,
# but will need tests first
for component in self._model.components:
if isinstance(component, GaussianModel):
center = self._parameters[
f"{component.prefix}center"].value
height_init, cen_init, fwhm_init = \
self.guess_init_peak(
xx, yy, center_guess=center,
use_max_for_center=False)
# if (self._fwhm_min is not None
# and fwhm_init < self._fwhm_min):
# fwhm_init = self._fwhm_min
# elif (self._fwhm_max is not None
# and fwhm_init > self._fwhm_max):
# fwhm_init = self._fwhm_max
ast(f'fwhm = {fwhm_init}')
ast(f'height = {height_init}')
sig_init = ast(fwhm_factor[component._name])
amp_init = ast(height_factor[component._name])
par = self._parameters[
f"{component.prefix}amplitude"]
if par.vary:
par.set(value=amp_init)
par = self._parameters[
f"{component.prefix}center"]
if par.vary:
par.set(value=cen_init)
par = self._parameters[
f"{component.prefix}sigma"]
if par.vary:
par.set(value=sig_init)
# Add constant offset for a normalized model
if self._result is None and self._norm is not None and self._norm[0]:
# Local modules
from CHAP.utils.models import Constant
model = Constant(
model='constant',
parameters=[{
'name': 'c',
'value': -self._norm[0],
'vary': False,
}])
self.add_model(model, 'tmp_normalization_offset_')
# Adjust existing parameters for refit:
if config is not None:
# Local modules
from CHAP.utils.models import (
FitConfig,
Multipeak,
)
# Expand multipeak model if present
scale_factor = None
for i, model in enumerate(deepcopy(config.models)):
found_multipeak = False
if isinstance(model, Multipeak):
if found_multipeak:
raise ValueError(
f'Invalid parameter models ({config.models}) '
'(multiple instances of multipeak not allowed)')
if (model.fit_type == 'uniform'
and 'scale_factor' not in self._free_parameters):
raise ValueError(
f'Invalid parameter models ({config.models}) '
'(uniform multipeak fit after unconstrained fit)')
parameters, models = FitProcessor.create_multipeak_model(
model)
if (model.fit_type == 'unconstrained'
and 'scale_factor' in self._free_parameters):
# Third party modules
from asteval import Interpreter
scale_factor = self._parameters['scale_factor'].value
self._parameters.pop('scale_factor')
self._free_parameters.remove('scale_factor')
ast = Interpreter()
ast(f'scale_factor = {scale_factor}')
if parameters:
config.parameters += parameters
config.models += models
config.models.remove(model)
found_multipeak = True
# Check for duplicate model names and create prefixes
prefixes = self._create_prefixes(config.models)
if not isinstance(config, FitConfig):
raise ValueError(f'Invalid parameter config ({config})')
parameters = config.parameters
for prefix, model in zip(prefixes, config.models):
for par in model.parameters:
par.name = f'{prefix}{par.name}'
parameters += model.parameters
# Adjust parameters for refit as needed
if isinstance(self, FitMap):
scale_factor_index = \
self._best_parameters.index('scale_factor')
self._best_parameters.pop(scale_factor_index)
self._best_values = np.delete(
self._best_values, scale_factor_index, 0)
self._best_errors = np.delete(
self._best_errors, scale_factor_index, 0)
self._best_vary = np.delete(
self._best_vary, scale_factor_index, 0)
for par in parameters:
name = par.name
if name not in self._parameters:
raise ValueError(
f'Unable to match {name} parameter {par} to an '
'existing one')
ppar = self._parameters[name]
if ppar.expr is not None:
if (scale_factor is not None and 'center' in name
and 'scale_factor' in ppar.expr):
ppar.set(value=ast(ppar.expr), expr='')
value = ppar.value
else:
raise ValueError(
f'Unable to modify {name} parameter {par} '
'(currently an expression)')
else:
value = par.value
if par.expr is not None:
raise KeyError(
f'Invalid "expr" key in {name} parameter {par}')
ppar.set(
value=value, min=par.min, max=par.max, vary=par.vary)
# Set parameters configuration
if self._code == 'scipy':
self._res_par_exprs = []
self._res_par_indices = []
self._res_par_names = []
self._res_par_values = []
for i, (name, par) in enumerate(self._parameters.items()):
self._res_par_values.append(par.value)
if par.expr:
self._res_par_exprs.append(
{'expr': par.expr, 'index': i})
else:
if par.vary:
self._res_par_indices.append(i)
self._res_par_names.append(name)
# Check for uninitialized parameters
for name, par in self._parameters.items():
if par.expr is None:
value = par.value
if value is None or np.isinf(value) or np.isnan(value):
if (self._norm is None
or name in self._nonlinear_parameters):
self._parameters[name].set(value=1.0)
elif 'fraction' not in name:
self._parameters[name].set(value=self._norm[1])
def _check_linearity_model(self):
"""Identify the linearity of all model parameters and check if
the model is linear or not.
"""
# Third party modules
from lmfit.models import ExpressionModel
from sympy import diff
# if not self._try_linear_fit:
# self._logger.info(
# 'Skip linearity check (not yet supported for callable models)')
# return False
free_parameters = \
[name for name, par in self._parameters.items() if par.vary]
for component in self._model.components:
if 'tmp_normalization_offset_c' in component.param_names:
continue
if isinstance(component, ExpressionModel):
for name in free_parameters:
if diff(component.expr, name, name):
self._nonlinear_parameters.append(name)
if name in self._linear_parameters:
self._linear_parameters.remove(name)
else:
model_parameters = component.param_names.copy()
for basename, hint in component.param_hints.items():
name = f'{component.prefix}{basename}'
if hint.get('expr') is not None:
model_parameters.remove(name)
for name in model_parameters:
expr = self._parameters[name].expr
if expr is not None:
for nname in free_parameters:
if name in self._nonlinear_parameters:
if diff(expr, nname):
self._nonlinear_parameters.append(nname)
if nname in self._linear_parameters:
self._linear_parameters.remove(nname)
else:
assert name in self._linear_parameters
if diff(expr, nname, nname):
self._nonlinear_parameters.append(nname)
if nname in self._linear_parameters:
self._linear_parameters.remove(nname)
if any(True for name in self._nonlinear_parameters
if self._parameters[name].vary):
return False
return True
def _fit_linear_model(self, x, y):
"""Perform a linear fit by direct matrix solution with numpy.
"""
# Third party modules
from asteval import Interpreter
from lmfit.model import ModelResult
from lmfit.models import (
ConstantModel,
LinearModel,
QuadraticModel,
ExpressionModel,
)
# Third party modules
from sympy import (
diff,
simplify,
)
# FIX self._parameter_norms
# pylint: disable=no-member
raise RuntimeError
# Construct the matrix and the free parameter vector
free_parameters = \
[name for name, par in self._parameters.items() if par.vary]
expr_parameters = {
name:par.expr for name, par in self._parameters.items()
if par.expr is not None}
model_parameters = []
for component in self._model.components:
if 'tmp_normalization_offset_c' in component.param_names:
continue
model_parameters += component.param_names
for basename, hint in component.param_hints.items():
name = f'{component.prefix}{basename}'
if hint.get('expr') is not None:
expr_parameters.pop(name)
model_parameters.remove(name)
norm = 1.0
if self._normalized:
norm = self._norm[1]
# Add expression parameters to asteval
ast = Interpreter()
for name, expr in expr_parameters.items():
ast.symtable[name] = expr
# Add constant parameters to asteval
# (renormalize to use correctly in evaluation of expression
# models)
for name, par in self._parameters.items():
if par.expr is None and not par.vary:
if self._parameter_norms[name]:
ast.symtable[name] = par.value*norm
else:
ast.symtable[name] = par.value
mat_a = np.zeros((len(x), len(free_parameters)), dtype='float64')
y_const = np.zeros(len(x), dtype='float64')
have_expression_model = False
for component in self._model.components:
if isinstance(component, ConstantModel):
name = component.param_names[0]
if name in free_parameters:
mat_a[:,free_parameters.index(name)] = 1.0
else:
if self._parameter_norms[name]:
delta_y_const = \
self._parameters[name] * np.ones(len(x))
else:
delta_y_const = \
(self._parameters[name]*norm) * np.ones(len(x))
y_const += delta_y_const
elif isinstance(component, ExpressionModel):
have_expression_model = True
const_expr = component.expr
for name in free_parameters:
dexpr_dname = diff(component.expr, name)
if dexpr_dname:
const_expr = \
f'{const_expr}-({str(dexpr_dname)})*{name}'
if not self._parameter_norms[name]:
dexpr_dname = f'({dexpr_dname})/{norm}'
y_expr = [(lambda _: ast.eval(str(dexpr_dname)))
(ast(f'x={v}')) for v in x]
if ast.error:
raise ValueError(
f'Unable to evaluate {dexpr_dname}')
mat_a[:,free_parameters.index(name)] += y_expr
const_expr = str(simplify(f'({const_expr})/{norm}'))
delta_y_const = [(lambda _: ast.eval(const_expr))
(ast(f'x = {v}')) for v in x]
y_const += delta_y_const
if ast.error:
raise ValueError(f'Unable to evaluate {const_expr}')
else:
free_model_parameters = [
name for name in component.param_names
if name in free_parameters or name in expr_parameters]
if not free_model_parameters:
y_const += component.eval(params=self._parameters, x=x)
elif isinstance(component, LinearModel):
name = f'{component.prefix}slope'
if name in free_model_parameters:
mat_a[:,free_parameters.index(name)] = x
else:
y_const += self._parameters[name].value * x
name = f'{component.prefix}intercept'
if name in free_model_parameters:
mat_a[:,free_parameters.index(name)] = 1.0
else:
y_const += self._parameters[name].value \
* np.ones(len(x))
elif isinstance(component, QuadraticModel):
name = f'{component.prefix}a'
if name in free_model_parameters:
mat_a[:,free_parameters.index(name)] = x**2
else:
y_const += self._parameters[name].value * x**2
name = f'{component.prefix}b'
if name in free_model_parameters:
mat_a[:,free_parameters.index(name)] = x
else:
y_const += self._parameters[name].value * x
name = f'{component.prefix}c'
if name in free_model_parameters:
mat_a[:,free_parameters.index(name)] = 1.0
else:
y_const += self._parameters[name].value \
* np.ones(len(x))
else:
# At this point each build-in model must be
# strictly proportional to each linear model
# parameter. Without this assumption, the model
# equation is needed
# For the current build-in lmfit models, this can
# only ever be the amplitude
assert len(free_model_parameters) == 1
name = f'{component.prefix}amplitude'
assert free_model_parameters[0] == name
assert self._parameter_norms[name]
expr = self._parameters[name].expr
if expr is None:
parameters = deepcopy(self._parameters)
parameters[name].set(value=1.0)
mat_a[:,free_parameters.index(name)] += component.eval(
params=parameters, x=x)
else:
const_expr = expr
parameters = deepcopy(self._parameters)
parameters[name].set(value=1.0)
dcomp_dname = component.eval(params=parameters, x=x)
for nname in free_parameters:
dexpr_dnname = diff(expr, nname)
if dexpr_dnname:
assert self._parameter_norms[name]
y_expr = np.asarray(
dexpr_dnname*dcomp_dname, dtype='float64')
if self._parameter_norms[nname]:
mat_a[:,free_parameters.index(nname)] += \
y_expr
else:
mat_a[:,free_parameters.index(nname)] += \
y_expr/norm
const_expr = \
f'{const_expr}-({dexpr_dnname})*{nname}'
const_expr = str(simplify(f'({const_expr})/{norm}'))
y_expr = [
(lambda _: ast.eval(const_expr))(ast(f'x = {v}'))
for v in x]
delta_y_const = np.multiply(y_expr, dcomp_dname)
y_const += delta_y_const
solution, _, _, _ = np.linalg.lstsq(
mat_a, y-y_const, rcond=None)
# Assemble result
# (compensate for normalization in expression models)
for name, value in zip(free_parameters, solution):
self._parameters[name].set(value=value)
if (self._normalized
and (have_expression_model or expr_parameters)):
for name, norm in self._parameter_norms.items():
par = self._parameters[name]
if par.expr is None and norm and 'fraction' not in name:
self._parameters[name].set(value=par.value*self._norm[1])
#RV FIX
self._result = ModelResult(
self._model, deepcopy(self._parameters), 'linear')
self._result.best_fit = self._model.eval(params=self._parameters, x=x)
if (self._normalized
and (have_expression_model or expr_parameters)):
if 'tmp_normalization_offset_c' in self._parameters:
offset = self._parameters['tmp_normalization_offset_c']
else:
offset = 0.0
self._result.best_fit = \
(self._result.best_fit-offset-self._norm[0]) / self._norm[1]
if self._normalized:
for name, norm in self._parameter_norms.items():
par = self._parameters[name]
if par.expr is None and norm and 'fraction' not in name:
value = par.value/self._norm[1]
self._parameters[name].set(value=value)
self._result.params[name].set(value=value)
self._result.residual = y-self._result.best_fit
self._result.components = self._model.components
self._result.init_params = None
def _fit_nonlinear_model(self, x, y, **kwargs):
"""Perform a nonlinear fit with spipy or lmfit."""
# Check bounds and prevent initial values at boundaries
have_bounds = False
self._parameter_bounds = {}
for name, par in self._parameters.items():
if par.vary:
self._parameter_bounds[name] = {
'min': par.min, 'max': par.max}
if not have_bounds and (
not np.isinf(par.min) or not np.isinf(par.max)):
have_bounds = True
if have_bounds:
self._reset_par_at_boundary()
# Perform the fit
if self._mask is not None:
x = x[~self._mask]
y = np.asarray(y)[~self._mask]
if self._code == 'scipy':
# Third party modules
from asteval import Interpreter
from scipy.optimize import (
leastsq,
least_squares,
)
assert self._mask is None
self._ast = Interpreter()
self._ast.basesymtable = dict(self._ast.symtable.items())
pars_init = []
for i, (name, par) in enumerate(self._parameters.items()):
setattr(par, '_init_value', par.value)
self._res_par_values[i] = par.value
if par.expr is None:
self._ast.symtable[name] = par.value
if par.vary:
pars_init.append(par.value)
if have_bounds:
bounds = (
[v['min'] for v in self._parameter_bounds.values()],
[v['max'] for v in self._parameter_bounds.values()])
if self._method in ('lm', 'leastsq'):
self._method = 'trf'
self._logger.debug(
f'Fit method changed to {self._method} for fit with '
'bounds')
else:
bounds = (-np.inf, np.inf)
init_params = deepcopy(self._parameters)
# t0 = time()
lskws = {
'ftol': 1.49012e-08,
'xtol': 1.49012e-08,
'gtol': 10*FLOAT_EPS,
}
if self._method == 'leastsq':
lskws['maxfev'] = 64000
result = leastsq(
self._residual, pars_init, args=(x, y), full_output=True,
**lskws)
else:
lskws['max_nfev'] = 64000
result = least_squares(
self._residual, pars_init, bounds=bounds,
method=self._method, args=(x, y), **lskws)
# t1 = time()
# print(f'\n\nFitting took {1000*(t1-t0):.3f} ms\n\n')
model_result = ModelResult(
self._model, self._parameters, x, y, self._method, self._ast,
self._res_par_exprs, self._res_par_indices,
self._res_par_names, result)
model_result.init_params = init_params
model_result.init_values = {}
for name, par in init_params.items():
model_result.init_values[name] = par.value
model_result.max_nfev = lskws.get('maxfev')
else:
fit_kws = {}
# if 'Dfun' in kwargs:
# fit_kws['Dfun'] = kwargs.pop('Dfun')
# t0 = time()
model_result = self._model.fit(
y, self._parameters, x=x, method=self._method, fit_kws=fit_kws,
**kwargs)
# t1 = time()
# print(f'\n\nFitting took {1000*(t1-t0):.3f} ms\n\n')
return model_result
def _normalize(self):
"""Normalize the data and initial parameters."""
if self._normalized:
return
if self._norm is None:
if self._y is not None and self._y_norm is None:
self._y_norm = np.asarray(self._y)
else:
if self._y is not None and self._y_norm is None:
self._y_norm = \
(np.asarray(self._y)-self._norm[0]) / self._norm[1]
self._y_range = 1.0
for name in self._linear_parameters:
par = self._parameters[name]
if par.expr is None:
# FIX not quite safe if fraction occurs in non pvoigt
if 'fraction' not in name:
value = par.value/self._norm[1]
_min = par.min
_max = par.max
if not np.isinf(_min) and abs(_min) != FLOAT_MIN:
_min /= self._norm[1]
if not np.isinf(_max) and abs(_max) != FLOAT_MIN:
_max /= self._norm[1]
par.set(value=value, min=_min, max=_max)
self._normalized = True
def _renormalize(self):
"""Renormalize the data and results."""
if self._norm is None or not self._normalized:
return
self._normalized = False
for name in self._linear_parameters:
par = self._parameters[name]
if par.expr is None:
# FIX not quite safe if fraction occurs in non pvoigt
if 'fraction' not in name:
value = par.value*self._norm[1]
_min = par.min
_max = par.max
if not np.isinf(_min) and abs(_min) != FLOAT_MIN:
_min *= self._norm[1]
if not np.isinf(_max) and abs(_max) != FLOAT_MIN:
_max *= self._norm[1]
par.set(value=value, min=_min, max=_max)
if self._result is None:
return
self._result.best_fit = (
self._result.best_fit*self._norm[1] + self._norm[0])
for name, par in self._result.params.items():
if name in self._linear_parameters:
# FIX not quite safe if fraction occurs in non pvoigt
if 'fraction' in name:
if self._code == 'scipy':
if par.stderr is not None:
setattr(par, '_stderr', par.stderr)
if par.expr is None and par.init_value is not None:
setattr(par, '_init_value',par.init_value)
else:
if par.stderr is not None:
if self._code == 'scipy':
setattr(
par, '_stderr', par.stderr*self._norm[1])
else:
par.stderr *= self._norm[1]
if par.expr is None:
value = par.value*self._norm[1]
if par.init_value is not None:
if self._code == 'scipy':
setattr(par, '_init_value',
par.init_value*self._norm[1])
else:
par.init_value *= self._norm[1]
_min = par.min
_max = par.max
if not np.isinf(_min) and abs(_min) != FLOAT_MIN:
_min *= self._norm[1]
if not np.isinf(_max) and abs(_max) != FLOAT_MIN:
_max *= self._norm[1]
par.set(value=value, min=_min, max=_max)
if hasattr(self._result, 'init_fit'):
self._result.init_fit = (
self._result.init_fit*self._norm[1] + self._norm[0])
if hasattr(self._result, 'init_values'):
init_values = {}
for name, value in self._result.init_values.items():
if name in self._linear_parameters and 'fraction' not in name:
init_values[name] = value*self._norm[1]
else:
init_values[name] = value
self._result.init_values = init_values
if (hasattr(self._result, 'init_params')
and self._result.init_params is not None):
for name, par in self._result.init_params.items():
if par.expr is None and name in self._linear_parameters:
# FIX not quite safe if fraction occurs in non pvoigt
if 'fraction' not in name:
value = par.value*self._norm[1]
_min = par.min
_max = par.max
if not np.isinf(_min) and abs(_min) != FLOAT_MIN:
_min *= self._norm[1]
if not np.isinf(_max) and abs(_max) != FLOAT_MIN:
_max *= self._norm[1]
par.set(value=value, min=_min, max=_max)
if self._code == 'scipy':
setattr(par, '_init_value', par.value)
else:
par.init_value = par.value
# Don't renormalize chisqr, it has no useful meaning in
# physical units
# self._result.chisqr *= self._norm[1]*self._norm[1]
if self._result.covar is not None:
for i, name in enumerate(self._result.var_names):
if 'fraction' in name:
nnorm = 1
else:
nnorm = self._norm[1]
if name in self._linear_parameters:
if 'fraction' in name:
norm_sq = nnorm
else:
norm_sq = nnorm * self._norm[1]
for j in range(len(self._result.var_names)):
if self._result.covar[i,j] is not None:
self._result.covar[i,j] *= norm_sq
if self._result.covar[j,i] is not None:
self._result.covar[j,i] *= norm_sq
# Don't renormalize redchi, it has no useful meaning in
# physical units
# self._result.redchi *= self._norm[1]*self._norm[1]
if self._result.residual is not None:
self._result.residual *= self._norm[1]
def _reset_par_at_boundary(self):
fraction = 0.02
for name, par in self._parameters.items():
if par.vary:
value = par.value
_min = self._parameter_bounds[name]['min']
_max = self._parameter_bounds[name]['max']
if np.isinf(_min):
if not np.isinf(_max):
if name in self._linear_parameters:
upp = _max - fraction*self._y_range
elif _max == 0.0:
upp = _max - fraction
else:
upp = _max - fraction*abs(_max)
if value >= upp:
par.set(value=upp)
else:
if np.isinf(_max):
if name in self._linear_parameters:
low = _min + fraction*self._y_range
elif _min == 0.0:
low = _min + fraction
else:
low = _min + fraction*abs(_min)
if value <= low:
par.set(value=low)
else:
low = (1.0-fraction)*_min + fraction*_max
upp = fraction*_min + (1.0-fraction)*_max
if value <= low:
par.set(value=low)
if value >= upp:
par.set(value=upp)
def _residual(self, pars, x, y):
res = np.zeros((x.size))
n_par = len(self._free_parameters)
for par, index in zip(pars, self._res_par_indices):
self._res_par_values[index] = par
if self._res_par_exprs:
for par, name in zip(pars, self._res_par_names):
self._ast.symtable[name] = par
for expr in self._res_par_exprs:
self._res_par_values[expr['index']] = \
self._ast.eval(expr['expr'])
for component, num_par in zip(
self._model.components, self._res_num_pars):
res += component.func(
x, *tuple(self._res_par_values[n_par:n_par+num_par]))
n_par += num_par
return res - y
[docs]
class FitMap(Fit):
"""Wrapper to the Fit class to fit data on a N-dimensional map."""
def __init__(self, nxdata, config, logger):
"""Initialize FitMap.
:param nxdata: The input data.
:type nxdata: nexusformat.nexus.NXdata
:param config: Fit configuration.
:type config: CHAP.utils.models.FitConfig, optional
:param logger: A python Logger object.
:type logger: logging.Logger
"""
super().__init__(None, config, logger)
self._best_errors = None
self._best_fit = None
self._best_parameters = None
self._best_values = None
self._best_vary = None
self._inv_transpose = None
self._max_nfev = None
self._memfolder = config.memfolder
self._new_parameters = None
self._num_func_eval = None
self._out_of_bounds = None
self._plot = False
self._print_report = False
self._redchi = None
self._redchi_cutoff = 0.1
self._rel_height_cutoff = None
self._skip_init = True
self._success = None
self._try_no_bounds = True
# At this point the fastest index should always be the signal
# dimension so that the slowest ndim-1 dimensions are the
# map dimensions
self._x = np.asarray(nxdata[nxdata.attrs['axes'][-1]])
self._ymap = np.asarray(nxdata.nxsignal)
# Check input parameters
if self._x.ndim != 1:
raise ValueError(f'Invalid x dimension ({self._x.ndim})')
if self._x.size != self._ymap.shape[-1]:
raise ValueError(
f'Inconsistent x and y dimensions ({self._x.size} vs '
f'{self._ymap.shape[-1]})')
# Flatten the map
# Store the flattened map in self._ymap_norm
self._map_dim = int(self._ymap.size/self._x.size)
self._map_shape = self._ymap.shape[:-1]
self._ymap_norm = np.reshape(
self._ymap, (self._map_dim, self._x.size))
# Check if a mask is provided
# if 'mask' in kwargs:
# self._mask = kwargs.pop('mask')
if True: #self._mask is None:
ymap_min = float(self._ymap_norm.min())
ymap_max = float(self._ymap_norm.max())
else:
ymap_min = None
ymap_max = None
# self._mask = np.asarray(self._mask).astype(bool)
# if self._x.size != self._mask.size:
# raise ValueError(
# f'Inconsistent mask dimension ({self._x.size} vs '
# f'{self._mask.size})')
# ymap_masked = np.asarray(self._ymap_norm)[:,~self._mask]
# ymap_min = float(ymap_masked.min())
# ymap_max = float(ymap_masked.max())
# Normalize the data
self._y_range = ymap_max-ymap_min
if self._y_range > 0.0:
self._norm = (ymap_min, self._y_range)
self._ymap_norm = (self._ymap_norm-self._norm[0]) / self._norm[1]
else:
self._redchi_cutoff *= self._y_range**2
# Setup fit model
self._setup_fit_model(config.parameters, config.models)
@property
def best_errors(self):
"""Return errors in the best fit parameters.
:type: numpy.ndarray
"""
return self._best_errors
@property
def best_fit(self):
"""Return the best fits.
:type: numpy.ndarray
"""
return self._best_fit
@property
def best_values(self):
"""Return values for the best fit parameters.
:type: numpy.ndarray
"""
return self._best_values
@property
def best_vary(self):
"""Return vary parameters for the best fit parameters.
:type: numpy.ndarray
"""
return self._best_vary
@property
def chisqr(self):
"""Return the chisqr value for each best fit.
:type: numpy.ndarray
"""
self._logger.warning('Undefined property chisqr')
@property
def components(self):
"""Return the fit model components info.
:type: dict
"""
# Third party modules
from lmfit.models import ExpressionModel
components = {}
if self._result is None:
self._logger.warning(
'Unable to collect components in FitMap.components')
return components
for component in self._result.components:
if 'tmp_normalization_offset_c' in component.param_names:
continue
parameters = {}
for name in component.param_names:
if self._parameters[name].vary:
parameters[name] = {'free': True}
elif self._parameters[name].expr is not None:
parameters[name] = {
'free': False,
'expr': self._parameters[name].expr,
}
else:
parameters[name] = {
'free': False,
'value': self.init_parameters[name]['value'],
}
expr = None
if isinstance(component, ExpressionModel):
name = component._name
if name[-1] == '_':
name = name[:-1]
expr = component.expr
else:
prefix = component.prefix
if prefix:
if prefix[-1] == '_':
prefix = prefix[:-1]
name = f'{prefix} ({component._name})'
else:
name = f'{component._name}'
if expr is None:
components[name] = {'parameters': parameters}
else:
components[name] = {'expr': expr, 'parameters': parameters}
return components
@property
def covar(self):
"""Return the covarience matrices for the best fit parameters.
:type: numpy.ndarray
"""
self._logger.warning('Undefined property covar')
@property
def max_nfev(self):
"""Return if the maximum number of function evaluations is
reached for each fit.
:type: numpy.ndarray
"""
return self._max_nfev
@property
def num_func_eval(self):
"""Return the number of function evaluations for each best fit.
:type: numpy.ndarray
"""
return self._num_func_eval
@property
def out_of_bounds(self):
"""Return the out_of_bounds flag values for each best fit.
:type: numpy.ndarray
"""
return self._out_of_bounds
@property
def redchi(self):
"""Return the redchi value for each best fit.
:type: numpy.ndarray
"""
return self._redchi
@property
def residual(self):
"""Return the residual in each best fit.
:type: numpy.ndarray
"""
if self.best_fit is None:
return None
if self._mask is None:
residual = np.asarray(self._ymap)-self.best_fit
else:
ymap_flat = np.reshape(
np.asarray(self._ymap), (self._map_dim, self._x.size))
ymap_flat_masked = ymap_flat[:,~self._mask]
ymap_masked = np.reshape(
ymap_flat_masked,
list(self._map_shape) + [ymap_flat_masked.shape[-1]])
residual = ymap_masked-self.best_fit
return residual
@property
def success(self):
"""Return the success value for each fit.
:type: bool
"""
return self._success
@property
def var_names(self):
"""Return the variable names for the covarience matrix
property.
:type: list[str]
"""
self._logger.warning('Undefined property var_names')
@property
def y(self):
"""Return the input y-coordinates.
:type: numpy.ndarray
"""
self._logger.warning('Undefined property y')
@property
def ymap(self):
"""Return the input y-coordinates map.
:type: numpy.ndarray
"""
return self._ymap
[docs]
def best_parameters(self, dims=None):
"""Return the best fit parameters.
:param dims: Map indices of the best fit parameters to return,
defaults to `None` which will return the list of the best
parameter names in the correct order for the results.
:type dims: list or tuple, optional
:return: Best fit parameters.
:rtype: list[str] or dict
"""
if dims is None:
return self._best_parameters
if (not isinstance(dims, (list, tuple))
or len(dims) != len(self._map_shape)):
raise ValueError('Invalid parameter dims ({dims})')
if self.best_values is None or self.best_errors is None:
self._logger.warning(
f'Unable to obtain best parameter values for dims = {dims}')
return {}
# Create current parameters
parameters = deepcopy(self._parameters)
for n, name in enumerate(self._best_parameters):
if self._parameters[name].vary:
parameters[name].set(value=self.best_values[n][dims])
parameters[name].stderr = self.best_errors[n][dims]
parameters_dict = {}
for name in sorted(parameters):
if name != 'tmp_normalization_offset_c':
par = parameters[name]
parameters_dict[name] = {
'value': par.value,
'error': par.stderr,
'init_value': self.init_parameters[name]['value'],
'min': par.min,
'max': par.max,
'vary': par.vary,
'expr': par.expr,
}
return parameters_dict
[docs]
def freemem(self):
"""Free memory allocated for parallel processing."""
if self._memfolder is None:
return
try:
rmtree(self._memfolder)
except Exception:
self._logger.warning('Could not clean-up automatically.')
[docs]
def plot(
self, dims=None, *, y_title=None, plot_comp_legends=False,
plot_residual=False, plot_masked_data=True, **kwargs):
"""Plot the best fits.
:param dims: Map indices of the data point to plot,
defaults to `None` which will plot the first data point.
:type dims: list or tuple, optional
:param y_title: y-axis label.
:type y_title: str, optional
:param plot_comp_legends: Add a legend for the individual
model components, defaults to `False`.
:type plot_comp_legends: bool, optional
:param plot_residual: Plot the residual, defaults to `False`.
:type plot_residual: bool, optional
:param plot_masked_data:
:type plot_masked_data: bool, optional
:param kwargs: Additional key, value pairs to pass on directly
to the Matplotlib plot function.
:type kwargs: dict
"""
# Third party modules
from lmfit.models import ExpressionModel
if dims is None:
dims = [0]*len(self._map_shape)
if (not isinstance(dims, (list, tuple))
or len(dims) != len(self._map_shape)):
raise ValueError('Invalid parameter dims ({dims})')
dims = tuple(dims)
if (self._result is None or self.best_fit is None
or self.best_values is None):
self._logger.warning(
f'Unable to plot fit for dims = {dims}')
return
if y_title is None or not isinstance(y_title, str):
y_title = 'data'
if self._mask is None:
mask = np.zeros(self._x.size).astype(bool)
plot_masked_data = False
else:
mask = self._mask
plots = [(self._x, np.asarray(self._ymap[dims]), 'b.')]
legend = [y_title]
if plot_masked_data:
plots += \
[(self._x[mask], np.asarray(self._ymap)[(*dims,mask)], 'bx')]
legend += ['masked data']
plots += [(self._x[~mask], self.best_fit[dims], 'k-')]
legend += ['best fit']
if plot_residual:
plots += [(self._x[~mask], self.residual[dims], 'r--')]
legend += ['residual']
# Create current parameters
parameters = deepcopy(self._parameters)
for name in self._best_parameters:
if self._parameters[name].vary:
parameters[name].set(
value=self.best_values[self._best_parameters.index(name)]
[dims])
for component in self._result.components:
if 'tmp_normalization_offset_c' in component.param_names:
continue
if isinstance(component, ExpressionModel):
prefix = component._name
if prefix[-1] == '_':
prefix = prefix[:-1]
modelname = f'{prefix}: {component.expr}'
else:
prefix = component.prefix
if prefix:
if prefix[-1] == '_':
prefix = prefix[:-1]
modelname = f'{prefix} ({component._name})'
else:
modelname = f'{component._name}'
if len(modelname) > 20:
modelname = f'{modelname[0:16]} ...'
y = component.eval(params=parameters, x=self._x[~mask])
if isinstance(y, (int, float)):
y *= np.ones(self._x[~mask].size)
plots += [(self._x[~mask], y, '--')]
if plot_comp_legends:
legend.append(modelname)
quick_plot(
tuple(plots), legend=legend, title=str(dims), block=True, **kwargs)
[docs]
def fit(self, config=None, **kwargs):
"""Fit the model to the input data.
:param config: Fit configuration.
:type config: CHAP.utils.models.FitConfig, optional
:param kwargs: Additional key, value pairs to pass on directly
to the core fit routine.
:type kwargs: dict
"""
# Check input parameters
if self._model is None:
self._logger.error('Undefined fit model')
num_proc_max = max(1, cpu_count())
if config is None:
num_proc = kwargs.pop('num_proc', num_proc_max)
self._rel_height_cutoff = kwargs.pop('rel_height_cutoff')
self._try_no_bounds = kwargs.pop('try_no_bounds', False)
self._redchi_cutoff = kwargs.pop('redchi_cutoff', 0.1)
self._print_report = kwargs.pop('print_report', False)
self._plot = kwargs.pop('plot', False)
self._skip_init = kwargs.pop('skip_init', True)
else:
num_proc = config.num_proc
self._rel_height_cutoff = config.rel_height_cutoff
# self._try_no_bounds = config.try_no_bounds
# self._redchi_cutoff = config.redchi_cutoff
self._print_report = config.print_report
self._plot = config.plot
# self._skip_init = config.skip_init
if num_proc > 1 and not HAVE_JOBLIB:
self._logger.warning(
'Missing joblib in the conda environment, running serially')
num_proc = 1
if num_proc > num_proc_max:
self._logger.warning(
f'The requested number of processors ({num_proc}) exceeds the '
'maximum allowed number of processors, num_proc reduced to '
f'{num_proc_max}')
num_proc = num_proc_max
self._logger.debug(f'Using {num_proc} processors to fit the data')
self._redchi_cutoff *= self._y_range**2
# Setup the fit
self._setup_fit(config)
# Create the best parameter list, consisting of all varying
# parameters plus the expression parameters in order to collect
# their errors
if self._result is None:
# Initial fit
assert self._best_parameters is None
self._best_parameters = [
name for name, par in self._parameters.items()
if par.vary or par.expr is not None]
num_new_parameters = 0
else:
# Refit
assert self._best_parameters
self._new_parameters = [
name for name, par in self._parameters.items()
if name != 'tmp_normalization_offset_c'
and name not in self._best_parameters
and (par.vary or par.expr is not None)]
num_new_parameters = len(self._new_parameters)
num_best_parameters = len(self._best_parameters)
# Flatten and normalize the best values of the previous fit,
# remove the remaining results of the previous fit
if self._result is not None:
self._out_of_bounds = None
self._max_nfev = None
self._num_func_eval = None
self._redchi = None
self._success = None
self._best_fit = None
self._best_errors = None
self._best_vary = None
assert self._best_values is not None
assert self._best_values.shape[0] == num_best_parameters
assert self._best_values.shape[1:] == self._map_shape
self._best_values = [
np.reshape(self._best_values[i], self._map_dim)
for i in range(num_best_parameters)]
if self._norm is not None:
for i, name in enumerate(self._best_parameters):
if (name in self._linear_parameters
and 'fraction' not in name):
self._best_values[i] /= self._norm[1]
# Normalize the initial parameters
# (and best values for a refit)
self._normalize()
# Initialize parameter bounds and check to prevent initial
# values at boundaries
self._parameter_bounds = {
name:{'min': par.min, 'max': par.max}
for name, par in self._parameters.items() if par.vary}
self._reset_par_at_boundary()
# Set parameter bounds to unbound
# (only use bounds when fit fails)
if 'fraction' in self._parameters and self._try_no_bounds:
self._logger.warning(
'Setting self._try_no_bounds to False for PseudoVoigt model')
self._try_no_bounds = False
if self._try_no_bounds:
for name in self._parameter_bounds.keys():
self._parameters[name].set(min=-np.inf, max=np.inf)
# Allocate memory to store fit results
if self._mask is None:
x_size = self._x.size
else:
x_size = self._x[~self._mask].size
if num_proc == 1:
self._out_of_bounds_flat = np.zeros(self._map_dim, dtype=bool)
self._max_nfev_flat = np.zeros(self._map_dim, dtype=bool)
self._num_func_eval_flat = np.zeros(self._map_dim, dtype=np.intc)
self._redchi_flat = np.zeros(self._map_dim, dtype=np.float64)
self._success_flat = np.zeros(self._map_dim, dtype=bool)
self._best_fit_flat = np.zeros(
(self._map_dim, x_size), dtype=self._ymap_norm.dtype)
self._best_errors_flat = [
np.zeros(self._map_dim, dtype=np.float64)
for _ in range(num_best_parameters+num_new_parameters)]
self._best_vary_flat = [
np.zeros(self._map_dim, dtype=bool)
for _ in range(num_best_parameters+num_new_parameters)]
if self._result is None:
self._best_values_flat = [
np.zeros(self._map_dim, dtype=np.float64)
for _ in range(num_best_parameters)]
else:
self._best_values_flat = self._best_values
self._best_values_flat += [
np.zeros(self._map_dim, dtype=np.float64)
for _ in range(num_new_parameters)]
else:
try:
mkdir(self._memfolder)
except FileExistsError:
pass
filename_memmap = path.join(
self._memfolder, 'out_of_bounds_memmap')
self._out_of_bounds_flat = np.memmap(
filename_memmap, dtype=bool, shape=(self._map_dim), mode='w+')
filename_memmap = path.join(self._memfolder, 'max_nfev_memmap')
self._max_nfev_flat = np.memmap(
filename_memmap, dtype=bool, shape=(self._map_dim), mode='w+')
filename_memmap = path.join(
self._memfolder, 'num_func_eval_memmap')
self._num_func_eval_flat = np.memmap(
filename_memmap, dtype=np.intc, shape=(self._map_dim),
mode='w+')
filename_memmap = path.join(self._memfolder, 'redchi_memmap')
self._redchi_flat = np.memmap(
filename_memmap, dtype=np.float64, shape=(self._map_dim),
mode='w+')
filename_memmap = path.join(self._memfolder, 'success_memmap')
self._success_flat = np.memmap(
filename_memmap, dtype=bool, shape=(self._map_dim), mode='w+')
filename_memmap = path.join(self._memfolder, 'best_fit_memmap')
self._best_fit_flat = np.memmap(
filename_memmap, dtype=self._ymap_norm.dtype,
shape=(self._map_dim, x_size), mode='w+')
self._best_errors_flat = []
for i in range(num_best_parameters+num_new_parameters):
filename_memmap = path.join(
self._memfolder, f'best_errors_memmap_{i}')
self._best_errors_flat.append(
np.memmap(filename_memmap, dtype=np.float64,
shape=self._map_dim, mode='w+'))
self._best_vary_flat = []
for i in range(num_best_parameters+num_new_parameters):
filename_memmap = path.join(
self._memfolder, f'best_errors_memmap_{i}')
self._best_vary_flat.append(
np.memmap(filename_memmap, dtype=bool,
shape=self._map_dim, mode='w+'))
self._best_values_flat = []
for i in range(num_best_parameters):
filename_memmap = path.join(
self._memfolder, f'best_values_memmap_{i}')
self._best_values_flat.append(
np.memmap(filename_memmap, dtype=np.float64,
shape=self._map_dim, mode='w+'))
if self._result is not None:
self._best_values_flat[i][:] = self._best_values[i][:]
for i in range(num_new_parameters):
filename_memmap = path.join(
self._memfolder,
f'best_values_memmap_{i+num_best_parameters}')
self._best_values_flat.append(
np.memmap(filename_memmap, dtype=np.float64,
shape=self._map_dim, mode='w+'))
# Update the best parameter list
if num_new_parameters:
self._best_parameters += self._new_parameters
# Perform the first fit to get model component info and
# initial parameters
current_best_values = {}
self._result = self._fit(
0, current_best_values, return_result=True, **kwargs)
# Remove all irrelevant content from self._result
for attr in (
'_abort', 'aborted', 'aic', 'best_fit', 'best_values', 'bic',
'calc_covar', 'call_kws', 'chisqr', 'ci_out', 'col_deriv',
'covar', 'data', 'errorbars', 'flatchain', 'ier', 'init_vals',
'init_fit', 'iter_cb', 'jacfcn', 'kws', 'last_internal_values',
'lmdif_message', 'message', 'method', 'nan_policy', 'ndata',
'nfev', 'nfree', 'params', 'redchi', 'reduce_fcn', 'residual',
'result', 'scale_covar', 'show_candidates', 'calc_covar',
'success', 'userargs', 'userfcn', 'userkws', 'values',
'var_names', 'weights', 'user_options'):
try:
delattr(self._result, attr)
except AttributeError:
pass
if self._map_dim > 1:
if num_proc == 1:
# Perform the remaining fits serially
for n in range(1, self._map_dim):
self._fit(n, current_best_values, **kwargs)
else:
# Perform the remaining fits in parallel
num_fit = self._map_dim-1
if num_proc > num_fit:
self._logger.warning(
f'The requested number of processors ({num_proc}) '
'exceeds the number of fits, num_proc reduced to '
f'{num_fit}')
num_proc = num_fit
num_fit_per_proc = 1
else:
num_fit_per_proc = round((num_fit)/num_proc)
if num_proc*num_fit_per_proc < num_fit:
num_fit_per_proc += 1
num_fit_batch = min(num_fit_per_proc, 40)
with Parallel(n_jobs=num_proc) as parallel:
parallel(
delayed(self._fit_parallel)
(current_best_values, num_fit_batch, n_start,
**kwargs)
for n_start in range(1, self._map_dim, num_fit_batch))
# Renormalize the initial parameters for external use
if self._norm is not None and self._normalized:
if hasattr(self._result, 'init_values'):
init_values = {}
for name, value in self._result.init_values.items():
if (name in self._nonlinear_parameters
or self._parameters[name].expr is not None):
init_values[name] = value
elif 'fraction' not in name:
init_values[name] = value*self._norm[1]
self._result.init_values = init_values
if (hasattr(self._result, 'init_params')
and self._result.init_params is not None):
for name, par in self._result.init_params.items():
if par.expr is None and name in self._linear_parameters:
# FIX not quite safe if fraction occurs in non pvoigt
if 'fraction' not in name:
value = par.value*self._norm[1]
_min = par.min
_max = par.max
if not np.isinf(_min) and abs(_min) != FLOAT_MIN:
_min *= self._norm[1]
if not np.isinf(_max) and abs(_max) != FLOAT_MIN:
_max *= self._norm[1]
par.set(value=value, min=_min, max=_max)
if self._code == 'scipy':
setattr(par, '_init_value', par.value)
else:
par.init_value = par.value
# Remap the best results
self._out_of_bounds = np.copy(np.reshape(
self._out_of_bounds_flat, self._map_shape))
self._max_nfev = np.copy(np.reshape(
self._max_nfev_flat, self._map_shape))
self._num_func_eval = np.copy(np.reshape(
self._num_func_eval_flat, self._map_shape))
self._redchi = np.copy(np.reshape(self._redchi_flat, self._map_shape))
self._success = np.copy(np.reshape(
self._success_flat, self._map_shape))
self._best_fit = np.copy(np.reshape(
self._best_fit_flat, list(self._map_shape)+[x_size]))
self._best_values = np.asarray([np.reshape(
par, list(self._map_shape)) for par in self._best_values_flat])
self._best_errors = np.asarray([np.reshape(
par, list(self._map_shape)) for par in self._best_errors_flat])
self._best_vary = np.asarray([np.reshape(
par, list(self._map_shape)) for par in self._best_vary_flat])
if self._inv_transpose is not None:
self._out_of_bounds = np.transpose(
self._out_of_bounds, self._inv_transpose)
self._max_nfev = np.transpose(self._max_nfev, self._inv_transpose)
self._num_func_eval = np.transpose(
self._num_func_eval, self._inv_transpose)
self._redchi = np.transpose(self._redchi, self._inv_transpose)
self._success = np.transpose(self._success, self._inv_transpose)
self._best_fit = np.transpose(
self._best_fit,
list(self._inv_transpose) + [len(self._inv_transpose)])
self._best_values = np.transpose(
self._best_values, [0] + [i+1 for i in self._inv_transpose])
self._best_errors = np.transpose(
self._best_errors, [0] + [i+1 for i in self._inv_transpose])
self._best_vary = np.transpose(
self._best_vary, [0] + [i+1 for i in self._inv_transpose])
del self._out_of_bounds_flat
del self._max_nfev_flat
del self._num_func_eval_flat
del self._redchi_flat
del self._success_flat
del self._best_fit_flat
del self._best_values_flat
del self._best_errors_flat
del self._best_vary_flat
# Restore parameter bounds and renormalize the parameters
for name, par in self._parameter_bounds.items():
self._parameters[name].set(min=par['min'], max=par['max'])
self._normalized = False
if self._norm is not None:
for name in self._linear_parameters:
par = self._parameters[name]
if par.expr is None:
# FIX not quite safe if fraction occurs in non pvoigt
if 'fraction' not in name:
value = par.value*self._norm[1]
_min = par.min
_max = par.max
if not np.isinf(_min) and abs(_min) != FLOAT_MIN:
_min *= self._norm[1]
if not np.isinf(_max) and abs(_max) != FLOAT_MIN:
_max *= self._norm[1]
par.set(value=value, min=_min, max=_max)
if num_proc > 1:
# Free the shared memory
self.freemem()
def _fit_parallel(self, current_best_values, num, n_start, **kwargs):
num = min(num, self._map_dim-n_start)
for n in range(num):
self._fit(n_start+n, current_best_values, **kwargs)
def _fit(self, n, current_best_values, return_result=False, **kwargs):
# Do not attempt a fit if the data is zero or entirely below
# the cutoff
y_max = self._ymap_norm[n].max()
if (y_max == 0.0
or (self._rel_height_cutoff is not None
and y_max < self._rel_height_cutoff)):
self._logger.debug(f'Skipping fit {n} (rel norm = {y_max:.5f})')
if self._code == 'scipy':
# Local modules
from CHAP.utils.fit import ModelResult
result = ModelResult(self._model, deepcopy(self._parameters))
else:
# Third party modules
from lmfit.model import ModelResult
result = ModelResult(self._model, deepcopy(self._parameters))
result.success = False
# Renormalize the data and results
self._renormalize(n, result)
return result
# Regular full fit
result = self._fit_with_bounds_check(n, current_best_values, **kwargs)
if self._rel_height_cutoff is not None:
# Check for low heights peaks and refit without them
heights = []
names = []
for component in result.components:
if component._name in ('gaussian', 'lorentzian'):
for name in component.param_names:
if 'height' in name:
heights.append(result.params[name].value)
names.append(name)
if heights:
refit = False
max_height = max(heights)
parameters_save = deepcopy(self._parameters)
parameters_bounds_save = deepcopy(self._parameter_bounds)
for i, (name, height) in enumerate(zip(names, heights)):
if height < self._rel_height_cutoff*max_height:
self._parameters[
name.replace('height', 'amplitude')].set(
value=0.0, min=0.0, vary=False)
self._parameters[
name.replace('height', 'center')].set(
vary=False)
self._parameters[
name.replace('height', 'sigma')].set(
value=0.0, min=0.0, vary=False)
refit = True
if refit:
result = self._fit_with_bounds_check(
n, current_best_values, **kwargs)
# Reset fixed amplitudes back to default
self._parameters = deepcopy(parameters_save)
self._parameter_bounds = deepcopy(parameters_bounds_save)
if result.redchi >= self._redchi_cutoff:
result.success = False
self._num_func_eval_flat[n] = result.nfev
if result.nfev == result.max_nfev:
if result.redchi < self._redchi_cutoff:
result.success = True
self._max_nfev_flat[n] = True
if result.success:
assert all(
True for par in current_best_values
if par in result.params.values())
for par in result.params.values():
if par.vary:
current_best_values[par.name] = par.value
else:
errortxt = f'Fit for n = {n} failed'
if hasattr(result, 'lmdif_message'):
errortxt += f'\n\t{result.lmdif_message}'
if hasattr(result, 'message'):
errortxt += f'\n\t{result.message}'
self._logger.warning(f'{errortxt}')
# Renormalize the data and results
self._renormalize(n, result)
if self._print_report:
print(result.fit_report(show_correl=False))
if self._plot:
dims = np.unravel_index(n, self._map_shape)
if self._inv_transpose is not None:
dims = tuple(
dims[self._inv_transpose[i]] for i in range(len(dims)))
super().plot(
result=result, y=np.asarray(self._ymap[dims]),
plot_comp_legends=True, skip_init=self._skip_init,
title=str(dims))
if return_result:
return result
return None
def _fit_with_bounds_check(self, n, current_best_values, **kwargs):
# Set parameters to current best values, but prevent them from
# sitting at boundaries
if self._new_parameters is None:
# Initial fit
for name, value in current_best_values.items():
par = self._parameters[name]
if par.vary:
par.set(value=value)
else:
# Refit
for i, name in enumerate(self._best_parameters):
par = self._parameters[name]
if par.vary:
if name in self._new_parameters:
if name in current_best_values:
par.set(value=current_best_values[name])
elif par.expr is None:
par.set(value=self._best_values[i][n])
self._reset_par_at_boundary()
result = self._fit_nonlinear_model(
self._x, self._ymap_norm[n], **kwargs)
out_of_bounds = False
for name, par in self._parameter_bounds.items():
if self._parameters[name].vary:
value = result.params[name].value
if not np.isinf(par['min']) and value < par['min']:
out_of_bounds = True
break
if not np.isinf(par['max']) and value > par['max']:
out_of_bounds = True
break
self._out_of_bounds_flat[n] = out_of_bounds
if self._try_no_bounds and out_of_bounds:
# Rerun fit with parameter bounds in place
for name, par in self._parameter_bounds.items():
if self._parameters[name].vary:
self._parameters[name].set(min=par['min'], max=par['max'])
# Set parameters to current best values, but prevent them
# from sitting at boundaries
if self._new_parameters is None:
# Initial fit
for name, value in current_best_values.items():
par = self._parameters[name]
if par.vary:
par.set(value=value)
else:
# Refit
for i, name in enumerate(self._best_parameters):
par = self._parameters[name]
if par.vary:
if name in self._new_parameters:
if name in current_best_values:
par.set(value=current_best_values[name])
elif par.expr is None:
par.set(value=self._best_values[i][n])
self._reset_par_at_boundary()
result = self._fit_nonlinear_model(
self._x, self._ymap_norm[n], **kwargs)
out_of_bounds = False
for name, par in self._parameter_bounds.items():
if self._parameters[name].vary:
value = result.params[name].value
if not np.isinf(par['min']) and value < par['min']:
out_of_bounds = True
break
if not np.isinf(par['max']) and value > par['max']:
out_of_bounds = True
break
# Reset parameters back to unbound
self._parameters[name].set(min=-np.inf, max=np.inf)
assert not out_of_bounds
return result
def _renormalize(self, n, result):
self._success_flat[n] = result.success
if result.success:
self._redchi_flat[n] = np.float64(result.redchi)
if self._norm is None or not self._normalized:
for i, name in enumerate(self._best_parameters):
self._best_values_flat[i][n] = np.float64(
result.params[name].value)
self._best_errors_flat[i][n] = np.float64(
result.params[name].stderr)
self._best_vary_flat[i][n] = (
result.params[name].vary and result.success)
if result.success:
self._best_fit_flat[n] = result.best_fit
else:
for name, par in result.params.items():
if name in self._linear_parameters:
# FIX not quite safe if fraction occurs in non pvoigt
if 'fraction' in name:
if self._code == 'scipy':
if par.stderr is not None:
setattr(par, '_stderr', par.stderr)
if (par.expr is None and self._print_report
and par.init_value is not None):
setattr(par, '_init_value', par.init_value)
else:
if par.stderr is not None:
if self._code == 'scipy':
setattr(
par, '_stderr', par.stderr*self._norm[1])
else:
par.stderr *= self._norm[1]
if par.expr is None:
par.value *= self._norm[1]
if self._print_report:
if par.init_value is not None:
if self._code == 'scipy':
setattr(par, '_init_value',
par.init_value*self._norm[1])
else:
par.init_value *= self._norm[1]
if (not np.isinf(par.min)
and abs(par.min) != FLOAT_MIN):
par.min *= self._norm[1]
if (not np.isinf(par.max)
and abs(par.max) != FLOAT_MIN):
par.max *= self._norm[1]
for i, name in enumerate(self._best_parameters):
self._best_values_flat[i][n] = np.float64(
result.params[name].value)
self._best_errors_flat[i][n] = np.float64(
result.params[name].stderr)
self._best_vary_flat[i][n] = (
result.params[name].stderr and result.success)
if result.success:
self._best_fit_flat[n] = (
result.best_fit*self._norm[1] + self._norm[0])
if self._plot:
if not self._skip_init:
result.init_fit = (
result.init_fit*self._norm[1] + self._norm[0])
result.best_fit = np.copy(self._best_fit_flat[n])