Source code for CHAP.giwaxs.processor

#!/usr/bin/env python
#-*- coding: utf-8 -*-
"""Module for Processors unique to the GIWAXS workflow.

Add discription of GIWAXS
"""

# System modules
from json import loads
from typing import Optional

# Third party modules
import numpy as np
from pydantic import (
    Field,
    PrivateAttr,
    conint,
    constr,
)

# Local modules
from CHAP.giwaxs.models import (
    GiwaxsConversionConfig,
    PyfaiIntegrationConfig,
)
from CHAP.processor import Processor


[docs] class GiwaxsConversionProcessor(Processor): """A processor for converting GIWAXS images from curved to rectangular coordinates (wedge correction). :ivar config: Initialization parameters for an instance of :class:`~CHAP.giwaxs.models.GiwaxsConversionConfig`. :vartype config: dict, optional :ivar nxmemory: Maximum memory usage when reading NeXus files. :vartype nxmemory: int, optional :ivar nxpath: Path to a specific location in the NeXus file tree to read the intensity data from. :vartype nxpath: str, optional :ivar save_figures: Save .pngs of plots for checking inputs & outputs of this Processor, defaults to `False`. :vartype save_figures: bool, optional """ pipeline_fields: dict = Field( default = { 'config': 'giwaxs.models.GiwaxsConversionConfig'}, init_var=True) config: GiwaxsConversionConfig nxmemory: Optional[conint(gt=0)] = 100000 nxpath: Optional[constr(strip_whitespace=True, min_length=1)] = None save_figures: Optional[bool] = True _animation: list = PrivateAttr(default=[]) _figures: list = PrivateAttr(default=[])
[docs] def process(self, data): """Process the GIWAXS input images & configuration and return a map of the images in rectangular coordinates as a NeXus style `NXroot <https://manual.nexusformat.org/classes/base_classes/NXroot.html#nxroot>`__ object. :param data: Results of :class:`~CHAP.common.processor.MapProcessor` containing a map with the GIWAXS input images. :type data: list[PipelineData] :return: Converted GIWAXS images. :rtype: nexusformat.nexus.NXroot """ # Third party modules import fabio from nexusformat.nexus import ( NXdata, NXfield, NXprocess, nxsetconfig, ) from nexusformat.nexus.tree import NeXusError from pyFAI.gui.utils.units import Unit # Local modules from CHAP.common.map_utils import get_axes from CHAP.utils.general import nxcopy nxsetconfig(memory=self.nxmemory) # Load the detector data nxroot = self.get_nxroot(self.get_data(data)) # Validate the azimuthal integrator configuration and check # against the input data (availability and shape) nxentry = nxroot[nxroot.default] detector_ids = [ #str(id, 'utf-8') for id in nxentry.detector_ids.nxdata] str(id) for id in nxentry.detector_ids.nxdata] if len(detector_ids) > 1: raise RuntimeError('More than one detector not yet implemented') nxdata = nxentry[nxentry.default] data = {} independent_dims = {} skipped_detectors = [] ais = [] for ai in self.config.azimuthal_integrators: ai_id = ai.get_id() if ai_id in nxdata: if nxdata[ai_id].ndim != 3: raise RuntimeError('Inconsistent raw data dimension ' f'{nxdata[ai_id].ndim}') ais.append(ai) if self.nxpath is None: data[ai_id] = nxdata[ai_id].nxdata else: data[ai_id] = nxroot[self.nxpath] else: skipped_detectors.append(ai_id) if skipped_detectors: self.logger.warning('Skipping detector(s) ' f'{skipped_detectors} (no raw data)') if not ais: raise RuntimeError('No matching raw detector data found') ai_id = ais[0].get_id() axes = get_axes(nxdata) if not axes: self.logger.warning('Unable to find axes information') independent_dims[ai_id] = [ nxcopy(nxdata[a]) for a in axes] data[ai_id] = nxdata[ai_id] if axes[0] == 'theta': thetas = nxdata['theta'] theta_unit = thetas.attrs.get('units') if 'deg' in theta_unit: thetas = np.radians(thetas) else: thetas = None # Read the mask(s) # FIX read at validation, like the poni file masks = {} for ai in ais: self.logger.debug(f'Reading {ai.mask_file}') try: with fabio.open(ai.mask_file) as f: mask = f.data self.logger.debug( f'mask shape for {ai.get_id()}: {mask.shape}') masks[ai.get_id()] = mask except (IOError, OSError, TypeError, ValueError): self.logger.debug(f'No mask file found for {ai.get_id()}') if not masks: masks = None # Perform integration(s) ais_pyfai = {ai.get_id(): ai.ai for ai in ais} for integration in self.config.integrations: # Add a NXprocess object(s) to the NXroot nxprocess = NXprocess() try: nxroot[f'{nxroot.default}_{integration.name}'] = nxprocess except (NeXusError, ValueError): # Copy nxroot if nxroot is read as read-only nxroot = nxcopy(nxroot) nxroot[f'{nxroot.default}_{integration.name}'] = nxprocess nxprocess.integration_config = integration.model_dump_json() nxprocess.azimuthal_integrators = [ ai.model_dump_json() for ai in ais] # Integrate the data results = integration.integrate( ais_pyfai, data, masks=masks, thetas=thetas) # Create the NXdata object with the integrated data intensities = results['intensities'] coords = [i for k, v in independent_dims.items() for i in v if k in ais_pyfai] q_outofplane = results['outofplane']['coords'] if results['outofplane']['unit'] == 'qoop_A^-1': unit = Unit.INV_ANGSTROM.symbol elif results['outofplane']['unit'] == 'qoop_nm^-1': unit = 'nm^-1' else: unit = results['outofplane']['unit'] if np.asarray(intensities).ndim == 2: intensities = np.expand_dims(intensities, axis=0) coords.append( NXfield( q_outofplane, 'q_outofplane', attrs={'units': unit})) q_inplane = results['inplane']['coords'] if results['inplane']['unit'] == 'qip_A^-1': unit = Unit.INV_ANGSTROM.symbol elif results['inplane']['unit'] == 'qip_nm^-1': unit = 'nm^-1' else: unit = results['inplane']['unit'] coords.append( NXfield( q_inplane, 'q_inplane', attrs={'units': unit})) nxdata = NXdata(NXfield(intensities, ai_id), tuple(coords)) if len(axes) > 1: nxdata.attrs['unstructured_axes'] = nxdata.attrs['axes'][:-1] del nxdata.attrs['axes'] nxprocess.data = nxdata nxprocess.default = 'data' self.config.azimuthal_integrators = ais return nxroot
[docs] class PyfaiIntegrationProcessor(Processor): """A processor for azimuthally integrating images. :ivar config: Initialization parameters for an instance of :class:`~CHAP.giwaxs.models.PyfaiIntegrationConfig`. :vartype config: dict, optional :ivar nxmemory: Maximum memory usage when reading NeXus files. :vartype nxmemory: int, optional """ pipeline_fields: dict = Field( default = { 'config': 'giwaxs.models.PyfaiIntegrationConfig'}, init_var=True) config: PyfaiIntegrationConfig nxmemory: Optional[conint(gt=0)] = 100000
[docs] def process(self, data): """Process the input images & configuration and return a map of the azimuthally integrated images as a NeXus style `NXroot <https://manual.nexusformat.org/classes/base_classes/NXroot.html#nxroot>`__ object. :param data: Results of :class:`~CHAP.common.processor.MapProcessor` or other suitable preprocessor of the raw detector data containing the map of input images. :type data: list[PipelineData] :return: Integrated images. :rtype: nexusformat.nexus.NXroot """ # Third party modules import fabio from nexusformat.nexus import ( NXdata, NXfield, NXprocess, nxsetconfig, ) from pyFAI.gui.utils.units import Unit # Local imports from CHAP.utils.general import nxcopy nxsetconfig(memory=self.nxmemory) # Load the detector data nxroot = self.get_nxroot(self.get_data(data)) # Validate the azimuthal integrator configuration and check # against the input data (availability and shape) data = {} independent_dims = {} try: nxprocess_converted = nxroot[f'{nxroot.default}_converted'] conversion_config = loads( str(nxprocess_converted.conversion_config)) converted_ais = conversion_config['azimuthal_integrators'] if len(converted_ais) > 1: raise RuntimeError( 'More than one detector not yet implemented') if self.config.azimuthal_integrators is None: # Local modules from CHAP.common.models.integration import ( AzimuthalIntegratorConfig, ) ais = [AzimuthalIntegratorConfig(**converted_ais[0])] else: converted_ids = [ai['id'] for ai in converted_ais] skipped_detectors = [] ais = [] for ai in self.config.azimuthal_integrators: if ai.get_id() in converted_ids: ais.append(ai) else: skipped_detectors.append(ai.get_id()) if skipped_detectors: self.logger.warning( f'Skipping detector(s) {skipped_detectors} ' '(no converted data)') if not ais: raise RuntimeError( 'No matching azimuthal integrators found') nxdata = nxprocess_converted.data axes = nxdata.attrs['axes'] if len(nxdata.attrs['axes']) != 3: raise RuntimeError('More than one independent dimension ' 'not yet implemented') axes = axes[0] independent_dims[ais[0].get_id()] = nxcopy(nxdata[axes]) data[ais[0].get_id()] = np.flip(nxdata.nxsignal.nxdata, axis=1) except Exception as exc: experiment_type = loads( str(nxroot[nxroot.default].map_config))['experiment_type'] if experiment_type == 'GIWAXS': self.logger.warning( 'No converted data found, use raw data for integration') nxentry = nxroot[nxroot.default] detector_ids = [ #str(id, 'utf-8') for id in nxentry.detector_ids.nxdata] str(id) for id in nxentry.detector_ids.nxdata] if len(detector_ids) > 1: raise RuntimeError( 'More than one detector not yet implemented') if self.config.azimuthal_integrators is None: raise ValueError('Missing azimuthal_integrators parameter in ' f'PyfaiIntegrationProcessor.config ' f'({self.config})') nxdata = nxentry[nxentry.default] skipped_detectors = [] ais = [] for ai in self.config.azimuthal_integrators: if ai.get_id() in nxdata: if nxdata[ai.get_id()].ndim != 3: raise RuntimeError('Inconsistent raw data dimension ' f'{nxdata[ai.get_id()].ndim}') ais.append(ai) data[ai.get_id()] = nxdata[ai.get_id()].nxdata else: skipped_detectors.append(ai.get_id()) if skipped_detectors: self.logger.warning('Skipping detector(s) ' f'{skipped_detectors} (no raw data)') if not ais: raise RuntimeError('No matching raw detector data found') if 'unstructured_axes' in nxdata.attrs: axes = nxdata.attrs['unstructured_axes'] independent_dims[ais[0].get_id()] = [ nxcopy(nxdata[a]) for a in axes] elif 'axes' in nxdata.attrs: axes = nxdata.attrs['axes'] independent_dims[ais[0].get_id()] = nxcopy(nxdata[axes]) else: self.logger.warning('Unable to find independent_dimensions') data[ais[0].get_id()] = nxdata[ais[0].get_id()] # Select the images to integrate #if False and self.config.scan_step_indices is not None: # #FIX # independent_dims = independent_dims[self.config.scan_step_indices] # data = data[self.config.scan_step_indices] self.logger.debug( f'data shape(s): {[(k, v.shape) for k, v in data.items()]}') if self.config.sum_axes: data = {k:np.sum(v.nxdata, axis=0)[None,:,:] for k, v in data.items()} self.logger.debug('data shape(s) after summing: ' f'{[(k, v.shape) for k, v in data.items()]}') # Read the mask(s) # FIX read at validation, like the poni file masks = {} for ai in ais: self.logger.debug(f'Reading {ai.mask_file}') try: with fabio.open(ai.mask_file) as f: mask = f.data self.logger.debug( f'mask shape for {ai.get_id()}: {mask.shape}') masks[ai.get_id()] = mask except (IOError, OSError, ValueError): self.logger.debug(f'No mask file found for {ai.get_id()}') if not masks: masks = None # Perform integration(s) ais_pyfai = {ai.get_id(): ai.ai for ai in ais} for integration in self.config.integrations: # Add a NXprocess object(s) to the NXroot nxprocess = NXprocess() try: nxroot[f'{nxroot.default}_{integration.name}'] = nxprocess except ValueError: # Copy nxroot if nxroot is read as read-only nxroot = nxcopy(nxroot) nxroot[f'{nxroot.default}_{integration.name}'] = nxprocess nxprocess.integration_config = integration.model_dump_json() nxprocess.azimuthal_integrators = [ ai.model_dump_json() for ai in ais] # Integrate the data results = integration.integrate(ais_pyfai, data, masks) # Create the NXdata object with the integrated data intensities = results['intensities'] if self.config.sum_axes: coords = [] elif isinstance(axes, str): coords = [ v for k, v in independent_dims.items() if k in ais_pyfai] else: coords = [i for k, v in independent_dims.items() for i in v if k in ais_pyfai] if ('azimuthal' in results and results['azimuthal']['unit'] == 'chi_deg'): chi = results['azimuthal']['coords'] if integration.right_handed: chi = -np.flip(chi) intensities = np.flip(intensities, (len(coords))) coords.append(NXfield(chi, 'chi', attrs={'units': 'deg'})) if results['radial']['unit'] == 'q_A^-1': unit = Unit.INV_ANGSTROM.symbol coords.append( NXfield( results['radial']['coords'], 'q', attrs={'units': unit})) else: coords.append( NXfield( results['radial']['coords'], 'r'))#, # attrs={'units': '\u212b'})) self.logger.warning( f'Unknown radial unit: {results["radial"]["unit"]}') nxdata = NXdata(NXfield(intensities, 'integrated'), tuple(coords)) if not isinstance(axes, str): nxdata.attrs['unstructured_axes'] = nxdata.attrs['axes'][:-1] del nxdata.attrs['axes'] nxprocess.data = nxdata nxprocess.default = 'data' self.config.azimuthal_integrators = ais return nxroot
if __name__ == '__main__': # Local modules from CHAP.processor import main main()