Source code for CHAP.common.models.integration

"""`pyFAI <https://pyfai.readthedocs.io/en/stable/>`__ integration
related `Pydantic <https://github.com/pydantic/pydantic>`__ model
configuration classes."""

# System modules
from copy import deepcopy
import os
from typing import (
    Literal,
    Optional,
    Union,
)

# Third party modules
import numpy as np
from pydantic import (
    FilePath,
    PrivateAttr,
    confloat,
    conint,
    conlist,
    constr,
    field_validator,
    model_validator,
)
from pyFAI.integrator.azimuthal import AzimuthalIntegrator

# Local modules
from CHAP.common.models.map import Detector
from CHAP.models import CHAPBaseModel


[docs] class AzimuthalIntegratorConfig(Detector, CHAPBaseModel): """Azimuthal integrator configuration class to represent a single detector used in the experiment. :ivar mask_file: Path to the mask file. :vartype mask_file: FilePath, optional :ivar poni_file: Path to the PONI file, specify either `poni_file` or `params`, not both. :vartype poni_file: FilePath, optional :ivar params: Azimuthal integrator configuration parameters, specify either `poni_file` or `params`, not both. :vartype params: dict, optional """ mask_file: Optional[FilePath] = None params: Optional[dict] = None poni_file: Optional[FilePath] = None _ai: AzimuthalIntegrator = PrivateAttr()
[docs] @model_validator(mode='before') @classmethod def validate_root(cls, data): """Make sure `data` contains either `poni_file` _or_ `params`, not both, and that the field that is used defines a valid `pyFAI.azimuthalIntegrator.AzimuthalIntegrator <https://www.silx.org/doc/pyFAI/latest/api/pyFAI.html#pyFAI.__init__.AzimuthalIntegrator>`__ object. :param data: `Pydantic <https://github.com/pydantic/pydantic>`__ validator data object. :type data: dict :return: Currently validated class attributes. :rtype: dict """ if isinstance(data, dict): inputdir = data.get('inputdir') mask_file = data.get('mask_file') params = data.get('params') poni_file = data.get('poni_file') if mask_file is not None: if inputdir is not None and not os.path.isabs(mask_file): data['mask_file'] = mask_file if params is not None: if poni_file is not None: print('Specify either poni_file or params, not both, ' 'ignoring poni_file') poni_file = None elif poni_file is not None: if inputdir is not None and not os.path.isabs(poni_file): data['poni_file'] = poni_file else: raise ValueError('Specify either poni_file or params') return data
[docs] @model_validator(mode='after') def validate_ai(self): """Set the default azimuthal integrator. :return: Validated configuration class. :rtype: AzimuthalIntegratorConfig """ if self.params is not None: self._ai = AzimuthalIntegrator(**self.params) elif self.poni_file is not None: # Third party modules from pyFAI import load self._ai = load(str(self.poni_file)) self.params = { 'detector': self._ai.detector.name, 'dist': self._ai.dist, 'poni1': self._ai.poni1, 'poni2': self._ai.poni2, 'rot1': self._ai.rot1, 'rot2': self._ai.rot2, 'rot3': self._ai.rot3, 'wavelength': self._ai.wavelength, } return self
@property def ai(self): """Return the azimuthal integrator. :type: AzimuthalIntegrator """ return self._ai @property def mask_data(self): """Return the mask array to use for this detector from the data in the file specified with the `mask_file` field. Return `None` if `mask_file` is `None`. :type: numpy.ndarray """ # Third party modules import fabio if self.mask_file is None: return None _mask_file = fabio.open(self.mask_file) mask_data = _mask_file.data _mask_file.close() return mask_data
[docs] class MultiGeometryConfig(CHAPBaseModel): """Class representing the configuration for treating simultaneously multiple detector configuration within a single integration :ivar ais: List of detector IDs of azimuthal integrators. :vartype ais: str or list[str] :ivar azimuth_range: Common azimuthal range for integration, defaults to `[-180.0, 180.0]`. :vartype azimuth_range: list[float, float] or tuple[float, float], optional :ivar radial_range: Common range for integration, defaults to `[0.0, 180.0]`. :vartype radial_range: list[float, float] or tuple[float, float], optional :ivar unit: Output unit, defaults to `'q_A^-1'`. :vartype unit: str, optional :ivar chi_disc: chi discontinuity value, defaults to `180`. :vartype chi_disc: int, optional :ivar empty: Value for empty pixels, defaults to `0`. :vartype empty: float, optional :ivar wavelength: Wave length used in meters. :vartype wavelength: float, optional """ ais: conlist( min_length=1, item_type=constr(min_length=1, strip_whitespace=True)) azimuth_range: Optional[ conlist( min_length=2, max_length=2, item_type=confloat(ge=-180, le=360, allow_inf_nan=False)) ] = [-180.0, 180.0] radial_range: Optional[ conlist( min_length=2, max_length=2, item_type=confloat(ge=0, le=180, allow_inf_nan=False)) ] = [0.0, 180.0] unit: Optional[ constr(strip_whitespace=True, min_length=1)] = 'q_A^-1' chi_disc: Optional[int] = 180 empty: Optional[confloat(allow_inf_nan=False)] = 0.0 wavelength: Optional[confloat(allow_inf_nan=False)] = None
[docs] @field_validator('ais', mode='before') @classmethod def validate_ais(cls, ais): """Validate the detector IDs of the azimuthal integrators. :param ais: Detector IDs. :type ais: str, list[str] :return: Detector ais. :rtype: list[str] """ if isinstance(ais, str): return [ais] return ais
class _IntegrateConfig(CHAPBaseModel): """Class with the input parameters to perform various integrations with `pyFAI <https://pyfai.readthedocs.io/en/stable/>`__. :ivar error_model: When the variance is unknown, an error model can be given (ignored for radial integration): `poisson` (variance = I) or `azimuthal` (variance = (I-<I>)^2). :vartype error_model: str, optional """ # correctSolidAngle: true # dark: None error_model: Optional[constr(strip_whitespace=True, min_length=1)] = None # filename: None # flat: None # mask: None # metadata: None # normalization_factor: Optional[confloat(allow_inf_nan=False)] = 1.0 # polarization_factor: None # variance: None attrs: Optional[dict] = {}
[docs] class Integrate1dConfig(_IntegrateConfig): """Class with the input parameters to perform 1D azimuthal integration with `pyFAI <https://pyfai.readthedocs.io/en/stable/>`__. :ivar method: For pyFAI.azimuthalIntegrator.AzimuthalIntegrator a registered integration method or a 3-tuple (splitting, algorithm, implementation), defaults to `csr`. For pyFAI.multi_geometry.MultiGeometry a registered integration method, defaults to `splitpixel`. :vartype method: str or tuple, optional :ivar npt: Number of integration points, defaults to 1800. :vartype npt: int, optional """ method: Optional[Union[ str, conlist( min_length=3, max_length=3, item_type=constr(strip_whitespace=True, min_length=1))]] = None npt: Optional[conint(gt=0)] = 1800
[docs] class Integrate2dConfig(_IntegrateConfig): """Class with the input parameters to perform 2D azimuthal (cake) integration with `pyFAI <https://pyfai.readthedocs.io/en/stable/>`__. :ivar method: Registered integration method, defaults to `bbox` for pyFAI.azimuthalIntegrator.AzimuthalIntegrator or `splitpixel` for pyFAI.multi_geometry.MultiGeometry. :vartype method: str, optional :ivar npt_azim: Number of points for the integration in the azimuthal direction, defaults to 3600. :vartype npt_azim: int, optional :ivar npt_rad: Number of points for the integration in the radial direction, defaults to 1800. :vartype npt_rad: int, optional """ method: Optional[str] = None npt_azim: Optional[conint(gt=0)] = 3600 npt_rad: Optional[conint(gt=0)] = 1800
[docs] class IntegrateRadialConfig(_IntegrateConfig, MultiGeometryConfig): """Class with the input parameters to perform radial integration with `pyFAI <https://pyfai.readthedocs.io/en/stable/>`__. :ivar method: Registered integration method, defaults to `csr`. :vartype method: str, optional :ivar radial_unit: Unit used for radial representation, defaults to `'q_A^-1'`. :vartype radial_unit: str, optional :ivar npt: Number of integration points, defaults to 1800. :vartype npt: int, optional """ radial_unit: Optional[ constr(strip_whitespace=True, min_length=1)] = 'q_A^-1' method: Optional[str] = 'csr' npt: Optional[conint(gt=0)] = 1800
[docs] class PyfaiIntegratorConfig(CHAPBaseModel): """Class representing the configuration for detector data integrator for `pyFAI <https://pyfai.readthedocs.io/en/stable/>`__. :ivar name: Integration type name, e.g. `cake`, or `wedge`. :vartype name: str :ivar integration_method: Integration method. :vartype integration_method: Literal[ 'integrate1d', 'integrate2d', 'integrate_radial', 'integrate2d_grazing_incidence'] :ivar multi_geometry: Multiple detector configuration. :vartype multi_geometry: MultiGeometryConfig :ivar integration_params: Integration parameter configuration. :vartype integration_params: Integrate1dConfig or Integrate2dConfig or IntegrateRadialConfig :ivar right_handed: For radial and cake integration, reverse the direction of the azimuthal coordinate from pyFAI's convention, defaults to `True`. :vartype right_handed: bool, optional """ name: constr(strip_whitespace=True, min_length=1) integration_method: Literal[ 'integrate1d', 'integrate2d', 'integrate_radial'] multi_geometry: Optional[MultiGeometryConfig] = None integration_params: Optional[Union[ Integrate1dConfig, Integrate2dConfig, IntegrateRadialConfig] ] = None right_handed: bool = True _placeholder_result: PrivateAttr = None
[docs] @model_validator(mode='before') @classmethod def validate_config(cls, data): """Validate the input integration configuration. :param data: `Pydantic <https://github.com/pydantic/pydantic>`__ validator data object. :type data: dict :return: Currently validated class attributes. :rtype: dict """ integration_method = data.get('integration_method') integration_params = data.get('integration_params') multi_geometry = data.get('multi_geometry') if integration_method == 'integrate1d': if multi_geometry is None: method = 'csr' else: method = 'splitpixel' if integration_params is None: data['integration_params'] = Integrate1dConfig(method=method) else: if integration_params.get('method') is None: integration_params['method'] = method data['integration_params'] = Integrate1dConfig( **integration_params) elif integration_method == 'integrate2d': if multi_geometry is None: method = 'bbox' else: method = 'splitpixel' if integration_params is None: data['integration_params'] = Integrate2dConfig(method=method) else: if integration_params.get('method') is None: integration_params['method'] = method data['integration_params'] = Integrate2dConfig( **integration_params) elif integration_method == 'integrate_radial': data['integration_params'] = IntegrateRadialConfig( **integration_params) if (integration_method != 'integrate_radial' and 'multi_geometry' not in data): mg = MultiGeometryConfig(**integration_params) data['multi_geometry'] = mg data['integration_params'].attrs.update(mg.model_dump( include={'azimuth_range', 'radial_range', 'unit'})) return data
@property def result_shape(self): """Return shape of one frame of results from this integration. :type: tuple """ if self.integration_method == 'integrate_radial': return (self.integration_params.npt, ) if self.integration_method == 'integrate1d': return (self.integration_params.npt, ) if self.integration_method == 'integrate2d': return (self.integration_params.npt_azim, self.integration_params.npt_rad) raise NotImplementedError( f'Unimplemented integration_method: {self.integration_method}') @property def result_coords(self): """Return a dictionary representing the `zarr.array` objects for the coordinates of a single frame of results from this integration. :type: dict """ if self._placeholder_result is None: raise RuntimeError('Missing placeholder results') return self._placeholder_result['coords']
[docs] def get_axes_indices(self, dataset_ndims): """Return the index of each coordinate orienting a single frame of results from this integration. :type: dict """ return {k: dataset_ndims + i for i, k in enumerate(self.result_coords.keys())}
[docs] def get_placeholder_data(self, ais): """Return empty input data of the correct shape for use in `init_placeholder_data`. :type: dict """ if self.integration_method == 'integrate_radial': return {ai:np.full(ais[ai].ai.detector.shape, 0) for ai in self.integration_params.ais} return {ai:np.full(ais[ai].ai.detector.shape, 0) for ai in self.multi_geometry.ais}
[docs] def init_placeholder_results(self, ais): """Get placeholder results for this integration so we can fill in the datasets for results of coordinates when setting up a zarr tree for holding results of :class:`~CHAP.saxswaxs.PyfaiIntegrationProcessor`. """ self._placeholder_result = self.integrate( ais, self.get_placeholder_data(ais))
[docs] def integrate(self, azimuthal_integrators, data): """Perform the integration and return the results. :param azimuthal_integrators: List of single-detector azimuthal integrator configurations. :type azimuthal_integrators: list[AzimuthalIntegratorConfig] :param data: Dictionary of 2D detector frames to be integrated. :type data: dict[str, np.ndarray] :return: Integrated intensities and coordinates for every frame (or set of frames) in `data`. :rtype: dict[str, object] """ # Third party modules from pyFAI.units import ( AZIMUTHAL_UNITS, RADIAL_UNITS, to_unit, ) ais = {name: ai.ai for name, ai in azimuthal_integrators.items()} if self.integration_method == 'integrate_radial': # Third party modules from pyFAI.containers import Integrate1dResult results = None npts = [] for k, v in data.items(): if v.ndim == 2: data[k] = np.expand_dims(v, 0) elif v.ndim != 3: raise ValueError( f'Illegal dimension for {k} data ({v.ndim})') npts.append(data[k].shape[0]) if not all(n == npts[0] for n in npts): raise RuntimeError('Different number of detector frames for ' f'each azimuthal integrator ({npts})') npts = npts[0] integration_params = self.integration_params.model_dump( exclude={'ais', 'attrs', 'error_model', 'chi_disc', 'empty', 'wavelength'}) for name in self.integration_params.ais: ai = ais[name] mask = azimuthal_integrators[name].mask_data _results = [ ai.integrate_radial( data=data[name][i], mask=mask, **integration_params) for i in range(npts) ] if results is None: results = _results else: results = [ Integrate1dResult( radial=_results[i].radial, intensity= _results[i].intensity + results[i].intensity) for i in range(npts) ] if self.right_handed: results = [ Integrate1dResult( radial=r.radial, intensity=np.flip(r.intensity) ) for r in results ] results = [ Integrate1dResult( radial=r.radial, intensity=np.where(r.intensity==0, np.nan, r.intensity) ) for r in results ] # Integrate1dResult's "radial" property is misleadingly # named here. When using integrate_radial, the property # actually contains azimuthal coordinate values. azimuthal_unit = to_unit( integration_params['unit'], type_=AZIMUTHAL_UNITS) # pyFAI doesn't add the correct unit for chi_deg if azimuthal_unit.name[-3:] == 'deg': azimuthal_unit.unit_symbol = '$^{o}$' coords = { azimuthal_unit.name: { 'attributes': { 'units': azimuthal_unit.unit_symbol, 'long_name': azimuthal_unit.label, }, 'data': results[0].radial, 'shape': results[0].radial.shape, 'dtype': 'float32', }, } results = {'intensities': [v.intensity for v in results], 'coords': coords} else: npts = [] for k, v in data.items(): if v.ndim == 2: data[k] = np.expand_dims(v, 0) elif v.ndim != 3: raise ValueError( f'Illegal dimension for {k} data ({v.ndim})') npts.append(data[k].shape[0]) if not all(n == npts[0] for n in npts): raise RuntimeError('Different number of detector frames for ' f'each azimuthal integrator ({npts})') npts = npts[0] if self.multi_geometry is None: raise RuntimeError('self.multi_geometry is None') if len(data) != 1: raise RuntimeError( 'Multiple detector not tested without multi_geometry') _id = list(ais.keys())[0] ai = list(ais.values())[0] integration_method = getattr(ai, self.integration_method) integration_params = self.integration_params.model_dump() integration_params = { **integration_params, **integration_params['attrs']} del integration_params['attrs'] raise RuntimeError('Check use of mask in integration_method') if masks is None: results = [ integration_method(data[_id][i], **integration_params) for i in range(npts) ] else: results = [ integration_method( np.where( masks[ai], 0, data[_id][i].astype(np.float64), **integration_params)) for i in range(npts) ] else: # Third party modules from pyFAI.multi_geometry import MultiGeometry mg = MultiGeometry( [ais[ai] for ai in self.multi_geometry.ais], **self.multi_geometry.model_dump(exclude={'ais'})) integration_method = getattr(mg, self.integration_method) lst_mask = [] for name in self.multi_geometry.ais: lst_mask.append(azimuthal_integrators[name].mask_data) results = [ integration_method( [data[ai][i].astype(np.float64) for ai in self.multi_geometry.ais], lst_mask=lst_mask, **self.integration_params.model_dump( exclude='attrs')) for i in range(npts) ] if isinstance(self.integration_params, Integrate1dConfig): if self.multi_geometry is None: unit = integration_params['unit'] else: unit = self.multi_geometry.unit radial_unit = to_unit(unit, type_=RADIAL_UNITS) coords = { radial_unit.name: { 'attributes': { 'units': radial_unit.unit_symbol, 'long_name': radial_unit.label, }, 'data': results[0].radial, 'shape': results[0].radial.shape, 'dtype': 'float32', }, } results = {'intensities': [v.intensity for v in results], 'coords': coords} else: # pyFAI doesn't add the correct unit for chi_deg azimuthal_unit = deepcopy(results[0].azimuthal_unit) if azimuthal_unit.name[-3:] == 'deg': azimuthal_unit.unit_symbol = '$^{o}$' coords = { azimuthal_unit.name: { 'attributes': { 'units': azimuthal_unit.unit_symbol, 'long_name': azimuthal_unit.label, }, 'data': results[0].azimuthal, 'shape': results[0].azimuthal.shape, 'dtype': 'float32', }, results[0].radial_unit.name: { 'attributes': { 'units': results[0].radial_unit.unit_symbol, 'long_name': results[0].radial_unit.label, }, 'data': results[0].radial, 'shape': results[0].radial.shape, 'dtype': 'float32', }, } if self.right_handed: intensities = [ np.flip(v.intensity, axis=0) for v in results] else: intensities = [v.intensity for v in results] results = {'intensities': intensities, 'coords': coords} return results
[docs] def zarr_tree(self, dataset_shape, dataset_chunks='auto'): """Return a dictionary representing a `zarr.group` that can be used to contain results from this integration. :return: A `zarr.group` that can be used to contain the integration results. :rtype: dict """ # Third party modules #import json tree = { # NXprocess 'attributes': { 'default': 'data', # 'config': json.dumps(self.dict()) }, 'children': { 'data': { # NXdata 'attributes': { # 'axes': self.result_axes(), **self.get_axes_indices(len(dataset_shape)) }, 'children': { 'I': { # NXfield 'attributes': { 'long_name': 'Intensity (a.u)', 'units': 'a.u' }, 'dtype': 'float64', 'shape': (*dataset_shape, *self.result_shape), 'chunks': (*dataset_chunks, *self.result_shape) if dataset_chunks != 'auto' else 'auto', 'compressors': None, }, **self.result_coords, } } } } return tree
[docs] class PyfaiIntegrationConfig(CHAPBaseModel): """Class defining components needed for performing one or more integrations on the same set of 2D input data with :class:`~CHAP.saxswaxs.PyfaiIntegrationProcessor`. :ivar azimuthal_integrators: List of single-detector azimuthal integrator configurations. :vartype azimuthal_integrators: list[AzimuthalIntegratorConfig] :ivar integrations: Azimuthal integrator configurations. :vartype integrations: list[PyfaiIntegratorConfig] :ivar sum_axes: Sum the detector data over the independent coordinates before integration, defaults to `False`. :vartype sum_axes: bool, optional """ azimuthal_integrators: Optional[conlist( min_length=1, item_type=AzimuthalIntegratorConfig)] = None integrations: conlist(min_length=1, item_type=PyfaiIntegratorConfig) sum_axes: Optional[bool] = False #sum_axes: Optional[ # Union[bool, conlist(min_length=1, item_type=str)]] = False
[docs] @model_validator(mode='before') @classmethod def validate_config(cls, data): """Ensure that a valid configuration was provided and finalize PONI filepaths. :param data: `Pydantic <https://github.com/pydantic/pydantic>`__ validator data object. :type data: dict :return: Currently validated class attributes. :rtype: dict """ if isinstance(data, dict): inputdir = data.get('inputdir') if inputdir is not None and 'azimuthal_integrators' in data: ais = data.get('azimuthal_integrators') for i, ai in enumerate(deepcopy(ais)): if isinstance(ai, dict): poni_file = ai['poni_file'] if not os.path.isabs(poni_file): ais[i]['poni_file'] = os.path.join( inputdir, poni_file) else: poni_file = ai.poni_file if not os.path.isabs(poni_file): ais[i].poni_file = os.path.join( inputdir, poni_file) data['azimuthal_integrators'] = ais return data
[docs] def zarr_tree(self, dataset_shape, dataset_chunks='auto'): """Return a dictionary representing a `zarr.group` that can be used to contain results from :class:`~CHAP.saxswaxs.PyfaiIntegrationProcessor`. :return: A `zarr.group` that can be used to contain the integration results. :rtype: dict """ ais = {ai.get_id(): ai for ai in self.azimuthal_integrators} for integration in self.integrations: integration.init_placeholder_results(ais) tree = { 'root': { 'attributes': { 'description': 'Container for processed SAXS/WAXS data' }, 'children': { integration.name: integration.zarr_tree( dataset_shape, dataset_chunks) for integration in self.integrations } } } return tree