Source code for CHAP.saxswaxs.processor

#!/usr/bin/env python
#-*- coding: utf-8 -*-
"""Module for Processors unique to the SAXSWAXS workflow.

Add discription of SAXS/WAXS
"""

# System modules
from typing import (
    Literal,
    Optional,
    Union,
)

# Third party modules
from pydantic import (
    conint,
    conlist,
    Field,
    FilePath,
)
import numpy as np

# Local modules
from CHAP.common.models.map import (
    Detector,
    MapConfig,
)
from CHAP.common.models.integration import PyfaiIntegrationConfig
from CHAP.common.processor import ExpressionProcessor
from CHAP.processor import Processor


[docs] class CfProcessor(Processor): """Processor to calculate the correction factor Cf that, when multiplied by appropriately processed SAXS/WAXS data, converts data to absolute cross-section / intensity in inverse cm. """
[docs] def process( self, data, interactive=False, save_figures=True, nxpath=None, radial_range=None, scan_step_indices=None, eps=1.e-5): """Compute the correction factor Cf and the configuration parameters. :param data: Input data list containing the reference data labelled with `'reference_data'` as well as the NeXus input data with the azimuthally integrated SAXS/WAXS data. :type data: list[PipelineData] :param interactive: Allows for user interactions, defaults to `False`. :type interactive: bool, optional :param save_figures: Create Matplotlib correction factor image that can be saved to file downstream in the workflow, defaults to `True`. :type save_figures: bool, optional :param nxpath: Path to a specific NeXus Style `NXdata <https://manual.nexusformat.org/classes/base_classes/NXdata.html#index-0>`__ object in the input NeXus file tree to the measured data from. :type nxpath: str, optional :param radial_range: q-range used to compute Cf. :type radial_range: list[float, float] or tuple[float, float]), optional :param scan_step_indices: Optional scan step indices to use for the calculation. If not specified, the correction factor will be computed on the average of all data for the scan. :type scan_step_indices: int, str, list[int], optional :param eps: Minimum plotting value of the corrected azimuthally integrated SAXS/WAXS data, defaults to `1.e-5`. :type eps: float, optional :returns: Computed correction factor Cf and the configuration parameters plus the optional correction factor image as a :class:`~CHAP.pipeline.PipelineData object`. :rtype: dict or (dict, PipelineData)] """ # Third party modules # pylint: disable=import-error from pandas import DataFrame # pylint: enable=import-error from scipy.interpolate import interp1d # Local modules from CHAP.pipeline import PipelineData from CHAP.utils.general import round_to_n if interactive or save_figures: # Third party modules import matplotlib.pyplot as plt # Local modules from CHAP.utils.general import fig_to_iobuf # Validate the input parameters if scan_step_indices is not None: if isinstance(scan_step_indices, int): scan_step_indices = [scan_step_indices] elif isinstance(scan_step_indices, str): # Local modules from CHAP.utils.general import string_to_list scan_step_indices = string_to_list(scan_step_indices) # Load the measured data if nxpath is None: try: nxdata = self.get_default_nxdata(self.get_data(data)) except Exception as exc: raise ValueError( 'No valid default NXdata object in pipeline data') from exc else: # Third party modules from nexusformat.nexus import NXdata try: nxdata = self.get_data(data)[nxpath] assert isinstance(nxdata, NXdata) except Exception as exc: raise ValueError('No valid default NXdata object in pipeline ' 'data') from exc if scan_step_indices is not None and nxdata.nxsignal.ndim != 2: self.logger.warning( 'Input parameters scan_step_indices for map with a number of ' 'independent dimensions other than 1') scan_step_indices = None if nxdata.nxsignal.ndim == 1: data_meas = nxdata.nxsignal.nxdata elif nxdata.nxsignal.ndim == 2 and scan_step_indices is not None: data_meas = \ nxdata.nxsignal.nxdata[scan_step_indices,:].mean(axis=0) else: data_meas = nxdata.nxsignal.nxdata.mean( axis=tuple(range(nxdata.nxsignal.ndim-1))) q_meas = nxdata[nxdata.attrs['axes'][1]].nxdata # Load the reference data ddata = self.get_data(data, name='reference_data') try: assert isinstance(ddata, DataFrame) assert len(ddata.columns) in (2, 3) q_ref = ddata.values[:,0] data_ref = ddata.values[:,1] except Exception as exc: raise ValueError( 'Invalid reference data format {type(ddata)}') from exc # Interpolate measured reference data onto reference q values mask = np.where( q_ref <= radial_range[1], np.where(q_ref >= radial_range[0], 1, 0), 0).astype(bool) if not q_ref[mask].size: raise ValueError( 'No reference values within specified radial range') func = interp1d(q_meas, data_meas, kind='cubic') data_meas_intpol = func(q_ref[mask]) if not data_meas_intpol.size: raise ValueError( 'No measured values within specified radial range') # Get the correction factor ratio = data_ref[mask]/data_meas_intpol cf = ratio.mean() cf_stv = ratio.std() self.logger.info(f'correction factor Cf: {round_to_n(cf, 6):e} \u00B1 ' f'{round_to_n(100*cf_stv/cf, 2)}%') # Assemble result result = { 'cf': float(cf), 'error': float(cf_stv), 'radial_range': radial_range, } # Plot the results data_meas *= cf figures = None if interactive or save_figures: fig, ax = plt.subplots(figsize=(11, 8.5)) ax.plot(q_ref, data_ref, label='APS Reference Data') ax.plot(q_meas, data_meas, label=r'Corrected FMB Data/C$_f$') for v in radial_range: plt.axvline(v, color='r', linestyle='--') ax.set_yscale('log') ax.set_title('Absolute Intensity Calculation', fontsize='xx-large') ax.set_xlabel(r'{q_A^-1}', fontsize='x-large') ax.set_ylabel('Normalized Intensity', fontsize='x-large') min_x = q_meas.min() max_x = q_meas.max() delta_x = 0.1*(q_meas.max() - q_meas.min()) ax.set_xlim((min_x-delta_x, max_x+delta_x)) ax.set_ylim( (10**np.floor(np.log10(data_meas[data_meas>eps].min())), 10**np.floor(1+np.log10(data_meas.max())))) ax.legend(fontsize='x-large', edgecolor='grey') plt.annotate( r'C$_f$ = 'f'{round_to_n(cf, 6):e}'r' $\pm$ ' f'{round_to_n(100*cf_stv/cf, 2)}%\n' r'1/C$_f$ = 'f'{round_to_n(1/cf, 6):e}', (.65, .9), xycoords = 'axes fraction', fontsize='x-large', bbox={'facecolor': 'white', 'edgecolor': 'grey', 'pad': 10.0}) if save_figures: fig.tight_layout(rect=(0, 0, 1, 0.95)) figures = fig_to_iobuf(fig) if interactive: plt.show() plt.close() if figures is not None: return ( result, PipelineData(name=self.__name__, data=figures, schema='common.write.ImageWriter')) return result
[docs] class FluxCorrectionProcessor(ExpressionProcessor): """Processor for flux correction."""
[docs] def process( self, data, presample_intensity_reference_rate=None, nxprocess=False): """Given input data for `'intensity'`, `'presample_intensity'`, and `'dwell_time_actual'`, compute the flux corrected intensity signal. :param data: Input data list containing items with names `'intensity'`, `'presample_intensity'`, and `'dwell_time_actual'` (if `presample_intensity_reference_rate` is not specified). :type data: list[PipelineData] :param presample_intensity_reference_rate: Reference counting rate for the `'presample_intensity'` signal. If not specified, it will set to `'numpy.nanmean(presample_intensity / dwell_time_actual)'`. :type presample_intensity_reference_rate: float, optional :param nxprocess: Flag to indicate the flux corrected data should be returned as a NeXus style `NXobject <https://manual.nexusformat.org/classes/base_classes/NXobject.html#index-0>`__ object. Defaults to `False`. :type nxprocess: bool, optional :returns: Flux corrected version of input `'intensity'` data. :rtype: Any """ if presample_intensity_reference_rate is None: presample_intensity_reference_rate = super().process( data, 'np.nanmean(presample_intensity / dwell_time_actual)' ) presample_intensity = self.get_data( data, name='presample_intensity', ) intensity = self.get_data( data, name='intensity', ) # nxfieldtable = { # 'intensity': intensity, # 'presample_intensity': presample_intensity, # 'presample_intensity_reference_rate': NXfield( # name='presample_intensity_reference_rate', # values=presample_intensity_reference_rate # ) # } # Extend presample_intensity along last dim to have same shape # as intensity for dim in intensity.shape[presample_intensity.ndim:]: presample_intensity = np.repeat( np.expand_dims(presample_intensity, axis=-1), dim, axis=-1 ) symtable = { 'presample_intensity_reference_rate': presample_intensity_reference_rate, 'intensity': intensity, 'presample_intensity': presample_intensity } expression = ( 'intensity *' '(presample_intensity_reference_rate / presample_intensity)' ) return super().process( data, expression, symtable=symtable, nxprocess=nxprocess, nxfieldtable={} )
[docs] class FluxAbsorptionCorrectionProcessor(ExpressionProcessor): """Processor for flux and absorption correction."""
[docs] def process( self, data, presample_intensity_reference_rate=None, nxprocess=False): """Given input data for `'intensity'`, `'presample_intensity'`, `'postsample_intensity'`, `'background_presample_intensity'`, `'background_postsample_intensity'`, and `'dwell_time_actual'`, compute the flux and absorption corrected intensity signal. :param data: Input data list containing all necessary data labelled with their proper names. :type data: list[PipelineData] :param presample_intensity_reference_rate: Reference counting rate for the `'presample_intensity'` signal. If not specified, it will be calculated with `'numpy.nanmean(presample_intensity / dwell_time_actual)'`. :type presample_intensity_reference_rate: float, optional :param nxprocess: Flag to indicate the flux and absorption corrected data should be returned as a NeXus style `NXobject <https://manual.nexusformat.org/classes/base_classes/NXobject.html#index-0>`__ object. Defaults to `False`. :type nxprocess: bool, optional :returns: Flux and absorption corrected version of input `'intensity'` data. :rtype: Any """ intensity = self.get_data( data, name='intensity', ) if presample_intensity_reference_rate is None: presample_intensity_reference_rate = super().process( data, 'np.nanmean(presample_intensity / dwell_time_actual)' ) tt = super().process( data, ('np.divide(postsample_intensity, presample_intensity) ' '/ np.average(' 'np.divide(background_postsample_intensity, background_presample_intensity))') ) # Extend tt along last dim to have same shape as intensity for dim in intensity.shape[tt.ndim:]: tt = np.repeat(np.expand_dims(tt, axis=-1), dim, axis=-1) presample_intensity = self.get_data( data, name='presample_intensity', ) # Extend presample_intensity along last dim to have same shape # as intensity for dim in intensity.shape[presample_intensity.ndim:]: presample_intensity = np.repeat( np.expand_dims(presample_intensity, axis=-1), dim, axis=-1 ) symtable = { 'presample_intensity_reference_rate': presample_intensity_reference_rate, 'intensity': intensity, 'presample_intensity': presample_intensity, 'tt': tt } expression = ( '(1 / tt)' '* intensity' '* (presample_intensity_reference_rate / presample_intensity)' ) return super().process( data, expression, symtable=symtable, nxprocess=nxprocess, nxfieldtable={} )
[docs] class FluxAbsorptionBackgroundCorrectionProcessor(ExpressionProcessor): """Processor for flux, absorption, and background correction as well as optional thickness correction."""
[docs] def process( self, data, presample_intensity_reference_rate=None, sample_thickness_cm=None, sample_mu_inv_cm=None, nxprocess=False): """Given input data for `'intensity'`, `'presample_intensity'`, `'postsample_intensity'`, `'background_presample_intensity'`, `'background_postsample_intensity'`, `'background_intensity'`, and `'dwell_time_actual'`, return flux, absorption and background corrected intensity signal. :param data: Input data list containing all necessary data labelled with their proper names. :type data: list[PipelineData] :param presample_intensity_reference_rate: Reference counting rate for the `'presample_intensity'` signal. If not specified, it will be calculated with `'numpy.nanmean(presample_intensity / dwell_time_actual)'`. :type presample_intensity_reference_rate: float, optional :param sample_thickness_cm: Sample thickness in centimeters. If specified, this processor will additionally perform thickness correction. Use of this parameter is mutualy exclusive with use of `sample_mu_inv_cm`. :type sample_thickness_cm: float, optional :param sample_mu_inv_cm: Sample linear attenuation coefficient in inverse centimeters. If specified, this processor will additionally perform thickness correction. Use of this parameter is mutualy exclusive with use of `sample_thickness_cm`. :type sample_mu_inv_cm: float, optional :param nxprocess: Flag to indicate the flux, absorption, and background corrected data should be returned as a Nexus style `NXobject <https://manual.nexusformat.org/classes/base_classes/NXobject.html#index-0>`__ object. Defaults to `False`. :type nxprocess: bool, optional :returns: Flux, absorption and background corrected version of input `'intensity'` data. :rtype: Any """ if sample_thickness_cm is not None and sample_mu_inv_cm is not None: raise ValueError(( 'Cannot use sample_thickness_cm and sample_mu_inv_cm' ' at the same time' )) intensity = self.get_data( data, name='intensity', ) if presample_intensity_reference_rate is None: presample_intensity_reference_rate = super().process( data, 'np.nanmean(presample_intensity / dwell_time_actual)' ) tt = super().process( data, ('np.divide(postsample_intensity, presample_intensity) ' '/ np.average(' 'np.divide(background_postsample_intensity, background_presample_intensity))') ) # Extend tt along last dim to have same shape as intensity for dim in intensity.shape[tt.ndim:]: tt = np.repeat(np.expand_dims(tt, axis=-1), dim, axis=-1) presample_intensity = self.get_data( data, name='presample_intensity', ) # Extend presample_intensity along last dim to have same shape # as intensity for dim in intensity.shape[presample_intensity.ndim:]: presample_intensity = np.repeat( np.expand_dims(presample_intensity, axis=-1), dim, axis=-1 ) # Broadcast background intensity signal to shape of measured # intensity signal background_intensity = self.get_data( data, name='background_intensity', ) background_intensity = np.broadcast_to( background_intensity, intensity.shape) if sample_thickness_cm is not None: t = sample_thickness_cm elif sample_mu_inv_cm is not None: t = -np.log(tt / sample_mu_inv_cm) else: t = 1 symtable = { 't': t, 'presample_intensity_reference_rate': presample_intensity_reference_rate, 'intensity': intensity, 'presample_intensity': presample_intensity, 'tt': tt, 'background_intensity': background_intensity } expression = ( '(1 / t)' '* (' '(1 / tt)' '* intensity' '* (presample_intensity_reference_rate / presample_intensity)' ') - (' 'background_intensity' '* (presample_intensity_reference_rate / np.average(background_presample_intensity))' ')' ) return super().process( data, expression, symtable=symtable, nxprocess=nxprocess, nxfieldtable={} )
[docs] class PyfaiIntegrationProcessor(Processor): """A processor for azimuthally integrating images. :ivar config: Initialization parameters for an instance of :class:`~CHAP.common.models.integration.PyfaiIntegrationConfig`. :vartype config: dict, optional """ pipeline_fields: dict = Field( default={ 'config': 'common.models.integration.PyfaiIntegrationConfig' }, init_var=True) config: PyfaiIntegrationConfig
[docs] def process(self, data, idx_slices=None): """Perform a set of integrations on 2D detector data. :param data: Input data. :type data: list[PipelineData] :param idx_slices: List of dictionaries identifying the sliced index at which the output data should be written in a dataset, defaults to `[{'start':0, 'step': 1}]`. :type idx_slices: list[dict[str, int]], optional :return: Integrated detector data ready for writing with :class:`~CHAP.saxswaxs.ZarrResultsWriter` or :class:`~CHAP.saxswaxs.NexusResultsWriter`. :rtype: list[dict[str, Any]] """ # System modules import time if idx_slices is None: idx_slices = [{'start':0, 'step': 1}] # Organize input for integrations input_data = {d['name']: d['data'] for d in [d for d in data if isinstance(d['data'], np.ndarray)]} ais = {ai.get_id(): ai for ai in self.config.azimuthal_integrators} # Finalize idx slice for results idx = tuple(slice(idx_slice.get('start'), idx_slice.get('stop'), idx_slice.get('step')) for idx_slice in idx_slices) # Perform integration(s), package results for ZarrResultsWriter results = [] nframes = len(input_data[list(input_data.keys())[0]]) for integration in self.config.integrations: t0 = time.time() self.logger.info(f'Integrating {integration.name}...') result = integration.integrate(ais, input_data) tf = time.time() self.logger.debug( f'Integrated {integration.name} ' f'({nframes/(tf-t0):.3f} frames/sec)') results.extend( [ { 'path': f'{integration.name}/data/I', 'idx': idx, 'data': np.asarray(result['intensities']), }, ] ) return results
[docs] class SetupResultsProcessor(Processor): """Processor for creating an intital `Zarr group <https://zarr.readthedocs.io/en/stable/api/zarr/group/#zarr.Group>`__ object with empty datasets for filling in by :class:`~CHAP.saxswaxs.PyfaiIntegrationProcessor` and :class:`~CHAP.common.ZarrValuesWriter`. :ivar config: Initialization parameters for an instance of :class:`~CHAP.common.models.integration.PyfaiIntegrationConfig`. :vartype config: dict :ivar dataset_shape: Shape of the completed dataset that will be processed later on (shape of the measurement itself, _not_ including the dimensions of any signals collected at each point in that measurement). :vartype dataset_shape: int or list[int] :ivar dataset_chunks: Extent of chunks along each dimension of the completed dataset / measurement. Choose this according to how you will process your data -- for example, if your `dataset_shape` is `[m, n]`, and you are planning to process each of the `m` rows as chunks, `dataset_chunks` should be `[1, n]`. But if you plan to process each of the `n` columns as chunks, `dataset_chunks` should be `[m, 1]`, defaults to `"auto"`. :vartype dataset_chunks: list[int] or Literal["auto"], optional """ pipeline_fields: dict = Field( default={ 'config': 'common.models.integration.PyfaiIntegrationConfig' }, init_var=True) config: PyfaiIntegrationConfig dataset_shape: conlist(item_type=conint(gt=0), min_length=1) dataset_chunks: Optional[ Union[ Literal['auto'], conlist(item_type=conint(gt=0), min_length=1) ]] = 'auto'
[docs] def process(self, data): """Create and return a `Zarr group <https://zarr.readthedocs.io/en/stable/api/zarr/group/#zarr.Group>`__ object to hold processed SAXS/WAXS data processed by :class:`~CHAP.saxswaxs.PyfaiIntegrationProcessor`. :param data: Input data. :type data: list[PipelineData] :return: Empty structure for filling in SAXS/WAXS data. :rtype: zarr.Group """ # Get Zarr tree as dict from the PyfaiIntegrationConfig tree = self.config.zarr_tree(self.dataset_shape, self.dataset_chunks) # Construct & return the root Zarr group return self.zarr_setup(tree)
[docs] def zarr_setup(self, tree): """Create a `Zarr group <https://zarr.readthedocs.io/en/stable/api/zarr/group/#zarr.Group>`__ object based on a dictionary representing a Zarr tree of groups and arrays. :param tree: Nested dictionary representing a Zarr tree of groups and arrays. :type tree: dict[str, Any] :return: Zarr group corresponding to the contents of `tree`. :rtype: zarr.Group """ # Third party modules # pylint: disable=import-error import zarr from zarr.storage import MemoryStore def create_group_or_dataset(node, zarr_parent, indent=0): """Create and return a `Zarr group <https://zarr.readthedocs.io/en/stable/api/zarr/group/#zarr.Group>`__ `Zarr dataset <https://zarr.readthedocs.io/en/stable/api/zarr/array/#zarr.Array>`__. :param node: Child Zarr tree group. :type node: zarr.Group or zarr.Array :param zarr_parent: Parent Zarr tree group. :type zarr_parent: zarr.Group :param indent: Indentation level, defaults to 0. :type indent: int, optional """ # Set attributes if present if 'attributes' in node: for key, value in node['attributes'].items(): zarr_parent.attrs[key] = value # Create children (groups or datasets) if 'children' in node: for name, child in node['children'].items(): if 'shape' in child or 'data' in child: # It's a dataset self.logger.debug(f'Adding dset: {name}') zarr_parent.create_dataset( name, **child, ) # Set dataset attributes if 'attributes' in child: for key, value in child['attributes'].items(): zarr_parent[name].attrs[key] = value else: # It's a group group = zarr_parent.create_group(name) create_group_or_dataset(child, group, indent=indent+2) results = zarr.create_group(store=MemoryStore({})) create_group_or_dataset(tree['root'], results) return results
[docs] class SetupProcessor(Processor): """Convenience Processor for setting up a container for SAXS/WAXS experiments. :ivar map_config: Map configuration. :vartype map_config: dict, optional :class:`~CHAP.common.models.integration.PyfaiIntegrationConfig`. :ivar pyfai_config: Initialization parameters for an instance of :class:`~CHAP.common.models.integration.PyfaiIntegrationConfig`. :vartype pyfai_config: dict, optional :ivar detectors: Detector configurations. :vartype detectors: DetectorConfig :ivar dataset_shape: Shape of the completed dataset that will be processed later on (shape of the measurement itself, _not_ including the dimensions of any signals collected at each point in that measurement). :vartype dataset_shape: int or list[int] :ivar dataset_chunks: Extent of chunks along each dimension of the completed dataset / measurement. Choose this according to how you will process your data -- for example, if your `dataset_shape` is `[m, n]`, and you are planning to process each of the `m` rows as chunks, `dataset_chunks` should be `[1, n]`. But if you plan to process each of the `n` columns as chunks, `dataset_chunks` should be `[m, 1]`, defaults to `"auto"`. :vartype dataset_chunks: list[int] or Literal["auto"], optional :ivar num_chunk: Used only if `dataset_chunks` is `"auto"`. Preferred number of chunks in the dataset, defaults to `1`. :vartype num_chunk: int, optional :ivar raw_data: Flag to indicate wether or not space for raw detector data should be included in the returned `Zarr group <https://zarr.readthedocs.io/en/latest/api/zarr/group/#zarr.Group>`__, defaults to `True`. :vartype raw_data: bool, optional """ pipeline_fields: dict = Field( default={ 'map_config': 'common.models.map.MapConfig', 'pyfai_config': 'common.models.integration.PyfaiIntegrationConfig' }, init_var=True) # map_config needs a default value because the map configuration # may be created by an earlier item in the Pipeline. If this is # the case, then map_config needs SOME value in order for the # Pipeline to pass validation. map_config: MapConfig = None pyfai_config: PyfaiIntegrationConfig detectors: conlist(item_type=Detector, min_length=1) dataset_shape: Optional[ conlist(item_type=conint(gt=0), min_length=1)] = None dataset_chunks: Optional[ Union[ Literal['auto'], conlist(item_type=conint(gt=0), min_length=1) ]] = 'auto' num_chunk: Optional[conint(gt=0)] = 1 raw_data: Optional[bool] = True
[docs] def process(self, data): """Set up a container for SAXS/WAXS experiments. :param data: Input data. :type data: list[PipelineData] :return: `Zarr group <https://zarr.readthedocs.io/en/stable/api/zarr/group/#zarr.Group>`__ object representing a Zarr tree of the container contents. :rtype: zarr.Group """ # System modules import asyncio import logging # Third party modules # pylint: disable=import-error import zarr from zarr.core.buffer import default_buffer_prototype from zarr.storage import MemoryStore # pylint: enable=import-error # Local modules from CHAP.common import ( MapProcessor, NexusToZarrProcessor, ) from CHAP.pipeline import PipelineData #from CHAP.saxswaxs.processor import SetupResultsProcessor def set_logger(pipeline_item): """Set the logger and logging handler for given pipeline item. :param pipeline_item: Pipeline item. :type pipeline_item: PipelineItem :return: Input Pipeline item, with updated logger and logging handler. :rtype: PipelineItem """ pipeline_item.logger = self.logger pipeline_item.logger.name = pipeline_item.__class__.__name__ handler = logging.StreamHandler() handler.setFormatter(logging.Formatter( '{asctime}: {name:20} (from '+ self.__class__.__name__ + '): {levelname}: {message}', datefmt='%Y-%m-%d %H:%M:%S', style='{')) pipeline_item.logger.removeHandler( pipeline_item.logger.handlers[0]) pipeline_item.logger.addHandler(handler) return pipeline_item # Get NXroot container for raw data map map_processor_kwargs = { 'config': self.map_config } if self.raw_data: map_processor_kwargs['detector_config'] = { 'detectors': self.detectors } else: map_processor_kwargs['detector_config'] = { 'detectors': [] } setup_map_processor = set_logger( MapProcessor( **map_processor_kwargs # config=self.map_config, # detector_config={'detectors': self.detectors}, ) ) ddata = [ PipelineData( data=setup_map_processor.process( data=data, fill_data=False), ) ] if self.dataset_shape is None: try: nxroot = self.get_data(ddata, remove=False) nxentry = self.get_default_nxentry(nxroot) nxdata = nxentry[nxentry.default] for detector in self.detectors: if self.dataset_shape is None: self.dataset_shape = nxdata[ detector.get_id()].shape[:-2] else: assert ( self.dataset_shape == nxdata[ detector.get_id()].shape[:-2] ) except Exception as exc: raise ValueError( 'Unable to get consistent dataset shape from map') from exc if self.dataset_chunks == 'auto': self.dataset_chunks = self.dataset_shape[0]//self.num_chunk if self.num_chunk*self.dataset_chunks < self.dataset_shape[0]: self.dataset_chunks += 1 self.dataset_chunks = [self.dataset_chunks] # Convert raw data map container to Zarr format ddata_converter = set_logger(NexusToZarrProcessor()) zarr_map = ddata_converter.process(ddata, chunks=self.dataset_chunks) # Get Zarr container for integration results setup_results_processor = set_logger( SetupResultsProcessor( config=self.pyfai_config, dataset_shape=self.dataset_shape, dataset_chunks=self.dataset_chunks, ) ) zarr_results = setup_results_processor.process(data) # Assemble containers for raw & processed data zarr_root = zarr.create_group(store=MemoryStore({})) async def copy_zarr_store(source_store, dest_store): """Copy a Zarr <https://zarr.readthedocs.io/en/stable/>`__ store. :param source_store: Source Zarr group. :type: zarr.Group :param dest_store: Destination Zarr group. :type: zarr.Group """ async for k in source_store.list(): self.logger.info(f'Copying {k}') buf = await source_store.get( k, prototype=default_buffer_prototype()) await dest_store.set(k, buf) asyncio.run(copy_zarr_store(zarr_map.store, zarr_root.store)) asyncio.run(copy_zarr_store(zarr_results.store, zarr_root.store)) return zarr_root
[docs] class UnstructuredToStructuredProcessor(Processor): """Processor to aggregate "unstructured" data into a single NeXus style `NXdata <https://manual.nexusformat.org/classes/base_classes/NXdata.html#index-0>`__ object with a "structured" representation. """
[docs] def process(self, data, fields, name='data', attrs=None): """Create and return a Nexus style `NXdata <https://manual.nexusformat.org/classes/base_classes/NXdata.html#index-0>`__ object containing a single structured dataset composed from multiple unstructured input datasets. This method validates the field configuration, validates and reshapes the input data, determines common axes across all signals, and constructs a `NXdata` group containing signal and axis fields. :param data: Input data objects containing unstructured datasets. :type data: list[PipelineData] :param fields: Configuration describing how to structure the input data. This is a list of dictionaries. Each dictionary must contain the required keys: - ``"name"``: Name of the data item, which must correspond to the ``name`` field of an item in ``data``. - ``"type"``: Either ``"signal"`` or ``"axis"``. - ``"axes"``: Required only for items where ``"type"`` is ``"signal"``. List of the names of the fields containing coordinate axes data for each dimension of the signal. Optional keys include: - ``"attrs"``: Dictionary of NeXus attributes to attach to. :type fields: list[dict[str, Any]] :param name: Name of the resulting `NXdata` group, defaults to `"data"`. :type name: str, optional :param attrs: Attributes to attach to the resulting `NXdata` group. The common axes determined during processing will be added to this dictionary under the ``"axes"`` key. :type attrs: dict[str, Any], optional :returns: A structured `NXdata` object containing all signals and axes defined by the configuration. :rtype: nexusformat.nexus.NXdata """ # Third party modules from nexusformat.nexus import ( NXdata, NXfield, ) signals, axes = self.validate_config_fields(fields) signals, axes = self.validate_data(data, signals, axes) common_axes = self.get_common_axes(signals) signals, axes, common_axes = self.structure_signal_values( signals, axes, common_axes) if attrs is None: attrs = {} attrs.update({'axes': common_axes}) return NXdata( name=name, attrs=attrs, **{ signal['name']: NXfield( name=signal['name'], attrs=signal['attrs'], value=signal['value_structured'] ) for signal in signals }, **{ axis['name']: NXfield( name=axis['name'], attrs=axis['attrs'], value=axis['value_unique'] ) for axis in axes } )
[docs] def validate_config_fields(self, fields): """Validate and normalize the field configuration. This method separates the input field configuration into signal and axis definitions, performs basic validation, and ensures that all axes referenced by signals are defined as axis fields. The returned signal and axis dictionaries are normalized into a consistent internal representation used by later processing stages. :param fields: Configuration describing how input data should be structured. Each item must define a ``"name"`` and ``"type"`` key, where ``"type"`` is either ``"signal"`` or ``"axis"``. Signal entries must additionally define an ``"axes"`` list. :type fields: list[dict[str, Any]] :raises ValueError: If a signal references an axis that is not defined, or if a signal is defined before any axis exist. :returns: Validated signal and axis definitions. :rtype: tuple[list[dict], list[dict]] """ self.logger.info('Validating fields parameter') axes = [] signals = [] for field in fields: field_type = field.get('type') name = field.get('name') attrs = field.get('attrs', {}) if field_type == 'axis': axes.append({'name': name, 'value': None, 'attrs': attrs}) self.logger.debug(f'Registered axis "{name}"') elif field_type == 'signal': _axes = field.get('axes', []) if not axes: raise ValueError(f'Signal "{name}" has no axes defined') signals.append({'name': name, 'axes': _axes, 'value': None, 'attrs': attrs}) self.logger.debug( f'Registered signal "{name}" with axes {_axes}' ) # Validate that all axes used by signals exist as type: axis axes_names = [a['name'] for a in axes] for signal in signals: for axis in signal['axes']: if axis not in axes_names: raise ValueError( f'Signal {signal["name"]} ' + f'references unknown axis "{axis}"' ) self.logger.info( 'Validated configuration for ' + f'{len(signals)} signals and {len(axes)} axes' ) return signals, axes
[docs] def get_common_axes(self, signals): """Determine the common leading axis shared by all signals. This method computes the longest common *prefix* of axis names across all signal definitions. Only axes that appear in the same order at the beginning of each signal's ``axes`` list are included in the result. This is used to identify the shared coordinate dimensions for a structured NeXus style `NXdata <https://manual.nexusformat.org/classes/base_classes/NXdata.html#index-0>`__`NXdata` object. :param signals: Validated signal definitions. Each signal must define an ``"axes"`` key containing an ordered list of axis names. :type signals: list[dict] :returns: Axis names that form the common leading axis for all signals. Returns an empty list if no common prefix exists. :rtype: list[str] """ self.logger.info('Computing common dataset axes') if not signals: self.logger.warning('No signals provided; no common axes') return [] # Start with the first signal's axes common_axes = list(signals[0]['axes']) for signal in signals[1:]: _axes = signal['axes'] i = 0 max_i = min(len(common_axes), len(_axes)) while i < max_i and common_axes[i] == _axes[i]: i += 1 common_axes = common_axes[:i] if not common_axes: break self.logger.info(f'Computed common axes: {common_axes}') return common_axes
[docs] def validate_data(self, data, signals, axes): """Validate and normalize input data for axes and signals. This method retrieves raw input data for each axis and signal, propagates metadata attributes, computes unique axis values, and allocates structured arrays for signal data. For each axis: - Raw data is loaded. - Attributes are merged (without overwriting user-specified ones). - Unique axis values are computed. For each signal: - Raw data is loaded. - Attributes are merged (without overwriting user-specified ones). - A structured output array is allocated based on its axes. - Total signal size is validated against the expected shape. :param data: Input data. :type data: list[PipelineData] :param signals: Validated signal field definitions. :type signals: list[dict] :param axes: Validated axis field definitions. :type axes: list[dict] :raises ValueError: If a signal's data size does not match the expected size derived from its axes. :returns: Updated signal and axis definitions with populated values and derived metadata. :rtype: tuple[list[dict], list[dict]] """ self.logger.info('Validating input data') self.logger.info('Validating axis data') for axis in axes: value = self.get_data(data, name=axis['name']) # Merge attributes, preserving explicitly defined ones axis['attrs'] = { **axis['attrs'], **{k: v for k, v in value.attrs.items() if k not in axis['attrs'] and k != 'target'} } axis['value'] = value axis['value_unique'] = np.unique(value) self.logger.debug( f'Axis {axis["name"]}: {value.size} entries, ' f'{axis["value_unique"].size} unique' ) # Build a lookup table for faster axis access by name axes_by_name = {a['name']: a for a in axes} self.logger.info("Validating signal data") for signal in signals: name = signal['name'] value = self.get_data(data, name=name) # Merge attributes, preserving explicitly defined ones signal['attrs'] = { **signal['attrs'], **{k: v for k, v in value.attrs.items() if k not in signal['attrs'] and k != 'target'} } signal['value'] = value _axes = signal['axes'] signal['attrs']['axes'] = _axes shape = tuple( [axes_by_name[a]['value_unique'].size for a in _axes] ) signal['value_structured'] = np.empty(shape, dtype=value.dtype) size_expected = np.prod(shape) size_actual = signal['value'].size self.logger.debug( f'Signal "{name}": expected size {size_expected} (shape: {shape}), ' f'actual size {size_actual} (shape: {value.shape})' ) if size_actual != size_expected: raise(ValueError( f'Signal {name} has size {size_actual}; ' + f'expected {size_expected}' )) self.logger.info('Validated input data') return signals, axes
[docs] def structure_signal_values(self, signals, axes, common_axes): """Reshape and populate structured signal arrays using common axes. This method determines computes index mappings from raw axis values to their unique sorted representations, and inserts each signal's unstructured data into its preallocated structured array. Only the common axes are used for structuring; any trailing, signal-specific axes are assumed to have already been handled when allocating the structured signal arrays. :param signals: Signal definitions with raw and preallocated structured data arrays. :type signals: list[dict] :param axes: Axis definitions containing raw values and unique values. :type axes: list[dict] :param common_axes: Ordered list of the names of the dataset's common axes. :type common_axes: list[str] :returns: - Updated signal definitions with populated structured arrays. - Unmodified axis definitions. - Common axis names shared by all signals. :rtype: tuple[list[dict], list[dict], list[str]] """ self.logger.info('Structuring dataset') axes_by_name = {a['name']: a for a in axes if a['name'] in common_axes} indices = { a: np.searchsorted(axes_by_name[a]['value_unique'], axes_by_name[a]['value']) for a in common_axes } _indices = tuple(indices[a] for a in common_axes) for signal in signals: signal['value_structured'][_indices] = signal['value'] return signals, axes, common_axes
[docs] class UpdateValuesProcessor(Processor): """Processes a slice of data for updating values in an existing container for a SAXS/WAXS experiment. :ivar map_config: Map Configuration. :vartype map_config: dict dict, optional :ivar pyfai_config: Initialization parameters for an instance of :class:`~CHAP.common.models.integration.PyfaiIntegrationConfig`. :vartype pyfai_config: dict, optional :ivar spec_file: SPEC file containing the scan from which to read and process a slice of raw data. :vartype spec_file: str :ivar scan_number: Scan number from which to read and process a slice of raw data. :vartype scan_number: int :ivar detectors: Detector configurations. :vartype detectors: list[CHAP.common.models.map.Detector] :ivar raw_data: Flag to indicate wether or not space for raw detector data should be included in the values returned, defaults to `True`. :vartype raw_data: bool, optional """ pipeline_fields: dict = Field( default={ 'map_config': 'common.models.map.MapConfig', 'pyfai_config': 'common.models.integration.PyfaiIntegrationConfig' }, init_var=True) # map_config needs a default value because the map configuration # may be created by an earlier item in the Pipeline. If this is # the case, then map_config needs SOME value in order for the # Pipeline to pass validation. map_config: MapConfig = None pyfai_config: PyfaiIntegrationConfig spec_file: FilePath scan_number: conint(gt=0) detectors: conlist(item_type=Detector, min_length=1) raw_data: Optional[bool] = True
[docs] def process(self, data, idx_slice=None): """Processes a slice of data for updating values in an existing container for a SAXS/WAXS experiment. :param data: Input data. :type data: list[PipelineData] :param idx_slice: Dictionaries identifying the sliced index at which the output data should be written in a dataset, defaults to `{'start':0, 'step': 1}`. :type idx_slice: dict[str, int], optional :return: Integrated detector data ready for writing with :class:`~CHAP.saxswaxs.ZarrResultsWriter` or :class:`~CHAP.saxswaxs.NexusResultsWriter`. :rtype: list[dict[str, Any]] :return: Integrated detector data for updating values in an existing container for a SAXS/WAXS experiment. :rtype: list[dict[str, Any]] """ # Get updates with MapSliceProcessor # Pass detector data to PyfaiIntegration processor # Concatenate & return results # System modules import logging import os # Local modules from CHAP.common import MapSliceProcessor from CHAP.pipeline import PipelineData #from CHAP.saxswaxs.processor import PyfaiIntegrationProcessor def set_logger(pipeline_item): """Set the logger and logging handler for given pipeline item. :param pipeline_item: Pipeline item. :type pipeline_item: PipelineItem :return: Input Pipeline item, with updated logger and logging handler. :rtype: PipelineItem """ pipeline_item.logger = self.logger pipeline_item.logger.name = pipeline_item.__class__.__name__ handler = logging.StreamHandler() handler.setFormatter(logging.Formatter( '{asctime}: {name:20} (from '+ self.__class__.__name__ + '): {levelname}: {message}', datefmt='%Y-%m-%d %H:%M:%S', style='{')) pipeline_item.logger.removeHandler( pipeline_item.logger.handlers[0]) pipeline_item.logger.addHandler(handler) return pipeline_item if idx_slice is None: idx_slice = {'start':0, 'step': 1} # Read in slice of raw data raw_values = set_logger( MapSliceProcessor( map_config=self.map_config, detectors=self.detectors, spec_file=str(self.spec_file), scan_number=self.scan_number, ) ).process(None, idx_slice=idx_slice) def _get_detector_data(values, name): "Get the detector data.""" for v in values: if os.path.basename(v['path']) == name: return v['data'] return None # Use raw detector data as input to integration for d in self.detectors: data.append( PipelineData( name=d.get_id(), data=_get_detector_data(raw_values, d.get_id()), ) ) # Get integrated data processed_values = set_logger( PyfaiIntegrationProcessor(config=self.pyfai_config) ).process(data, idx_slices=[idx_slice]) if self.raw_data: return raw_values + processed_values detector_ids = [d.get_id() for d in self.detectors] scalar_values = [ d for d in raw_values if not os.path.basename(d['path']) in detector_ids ] return scalar_values + processed_values
if __name__ == '__main__': # Local modules from CHAP.processor import main main()