Source code for CHAP.common.models.map

"""Map related `Pydantic <https://github.com/pydantic/pydantic>`__
model configuration classes."""

# System modules
from copy import deepcopy
from functools import (
    cache,
    lru_cache,
)
import os
from typing import (
    Literal,
    Optional,
    Union,
)

# Third party modules
import numpy as np
from pydantic import (
    Field,
    FilePath,
    PrivateAttr,
    conint,
    conlist,
    constr,
    field_validator,
    model_validator,
)
from pyspec.file.spec import FileSpec

# Local modules
from CHAP.models import CHAPBaseModel


[docs] class CHAPSlice(CHAPBaseModel): """Class representing a slice configuration for any particular dimension of a data set. :ivar start: Starting index for slicing, defaults to `0`. :vartype start: int, optional :ivar end: Ending index for slicing. :vartype end: int, optional :ivar step: Slicing step, defaults to `1`. :vartype step: int, optional """ start: Optional[int] = 0 end: Optional[int] = None # FIX allow stop as alias step: Optional[conint(gt=0)] = 1
[docs] def tolist(self): """Return the slice configuration as a list. :return: Slice configuration. :rtype: list """ return [self.start, self.end, self.step]
[docs] def toslice(self): """Return the slice configuration as a slice object. :return: Slice configuration. :rtype: slice """ return slice(self.start, self.end, self.step)
[docs] class Detector(CHAPBaseModel): """Class representing a single detector. :ivar id: Detector ID (e.g. name or channel index). :vartype id: str :ivar shape: Detector's raw data shape. :vartype shape: tuple[int,int], optional :ivar attrs: Additional detector configuration attributes. :vartype attrs: dict, optional """ id_: constr(min_length=1) = Field(alias='id') shape: Optional[tuple[int, int]] = None attrs: Optional[dict] = {}
[docs] @field_validator('id_', mode='before') @classmethod def validate_id(cls, id_): """Validate the detector ID. :param id: Detector ID (e.g. name or channel index). :type id: int, str :return: Validated detector ID. :rtype: str """ if isinstance(id_, int): return str(id_) return id_
[docs] @model_validator(mode='after') def validate_detector_after(self): """Validate any additional detector configuration attributes. :raises ValueError: Invalid attribute. :return: The validated detector class properties. :rtype: Detector """ # RV FIX add eta name = self.attrs.get('name') if name is not None: if isinstance(name, int): self.attrs['name'] = str(name) elif not isinstance(name, str): raise ValueError return self
[docs] def get_id(self): """Return the detector ID :type: str """ return self.id_
[docs] class DetectorConfig(CHAPBaseModel): """Class representing a detector configuration. :ivar detectors: Detector list. :vartype detectors: list[Detector] :ivar roi: Detector ROI. :vartype roi: list[CHAPSlice, CHAPSlice], optional """ # FIX ROI to make general, now just suited to and tested with TOMO detectors: conlist(item_type=Detector) roi: Optional[conlist( item_type=CHAPSlice, min_length=2, max_length=2)] = None
[docs] @field_validator('roi', mode='before') @classmethod def validate_roi(cls, roi): """Validate the detector ROI. :param roi: Detector ROI. :type roi: list[CHAPSlice, CHAPSlice] :return: Validated detector ROI :rtype: list[CHAPSlice, CHAPSlice] """ if roi is None: return roi return [CHAPSlice().model_dump() if v is None else v for v in roi]
[docs] def tolist(self): """Return the Detector ROI as a list. :return: Slice configuration. :rtype: [list, list] """ return [self.roi[0].tolist(), self.roi[1].tolist()]
[docs] def roitoslice(self): """Return the Detector ROI as a slice object. :return: Slice configuration. :rtype: [slice, slice] """ return [self.roi[0].toslice(), self.roi[1].toslice()]
[docs] class Sample(CHAPBaseModel): """Class representing a sample metadata configuration. :ivar name: Sample name. :vartype name: str :ivar description: Sample description. :vartype description: str, optional """ name: constr(min_length=1) description: Optional[str] = ''
[docs] class SpecScans(CHAPBaseModel): """Class representing a set of scans from a single SPEC file. :ivar spec_file: Path to the SPEC file. :vartype spec_file: str :ivar scan_numbers: Scan numbers to use. :vartype scan_numbers: int or list[int] or str :ivar par_file: Path to a non-default SMB-style par file. :vartype par_file: str, optional """ spec_file: FilePath scan_numbers: Union[ constr(min_length=1), conlist(item_type=conint(gt=0), min_length=1)] par_file: Optional[FilePath] = None
[docs] @field_validator('spec_file') @classmethod def validate_spec_file(cls, spec_file): """Validate the specified SPEC file. :param spec_file: Path to the SPEC file. :type spec_file: str :raises ValueError: If the SPEC file is invalid. :return: Validated absolute path to the SPEC file. :rtype: str """ try: spec_file = os.path.abspath(spec_file) FileSpec(spec_file) except Exception as exc: raise ValueError(f'Invalid SPEC file {spec_file}') from exc return spec_file
[docs] @field_validator('scan_numbers', mode='before') @classmethod def validate_scan_numbers(cls, scan_numbers, info): """Validate the specified list of scan numbers. :param scan_numbers: Scan numbers. :type scan_numbers: int or list[int] or str :param info: Model parameter validation information. :type info: pydantic.ValidationInfo :raises ValueError: If a specified scan number is not found in the SPEC file. :return: Validated scan numbers. :rtype: list[int] """ if isinstance(scan_numbers, int): scan_numbers = [scan_numbers] elif isinstance(scan_numbers, str): # Local modules from CHAP.utils.general import string_to_list scan_numbers = string_to_list(scan_numbers) spec_file = info.data.get('spec_file') if spec_file is not None: spec_scans = FileSpec(spec_file) for scan_number in scan_numbers: scan = spec_scans.get_scan_by_number(scan_number) if scan is None: raise ValueError( f'No scan number {scan_number} in {spec_file}') return scan_numbers
[docs] @field_validator('par_file', mode='before') @classmethod def validate_par_file(cls, par_file): """Validate the specified SMB-style par file. :param par_file: Path to a non-default SMB-style par file. :type par_file: str :raises ValueError: If the SMB-style par file is invalid. :return: Validated absolute path to the SMB-style par file. :rtype: str """ if par_file is None or not par_file: return None par_file = os.path.abspath(par_file) if not os.path.isfile(par_file): raise ValueError(f'Invalid SMB-style par file {par_file}') return par_file
@property def scanparsers(self): """Return the list of `ScanParser`s <https://github.com/CHESSComputing/chess-scanparsers?tab=readme-ov-file>`, for each of the scans specified by the SPEC file and scan numbers belonging to this instance of :class:`~CHAP.common.models.map.SpecScans`. :type: list[chess_scanparsers.ScanParser] """ return [self.get_scanparser(scan_no) for scan_no in self.scan_numbers]
[docs] def get_scanparser(self, scan_number): """Return a `ScanParser <https://github.com/CHESSComputing/chess-scanparsers?tab=readme-ov-file>`, for the specified scan number in the specified SPEC file. :param scan_number: Scan number to get a `ScanParser` for. :type scan_number: int :return: `ScanParser` for the specified scan number. :rtype: chess_scanparsers.ScanParser """ if self.par_file is None: return get_scanparser(self.spec_file, scan_number) return get_scanparser( self.spec_file, scan_number, par_file=self.par_file)
[docs] def get_index(self, scan_number, scan_step_index, map_config): """Return a tuple representing the index of a specific step in a specific SPEC scan within a map. :param scan_number: Scan number to get index for. :type scan_number: int :param scan_step_index: Scan step index to get index for. :type scan_step_index: int :param map_config: Map configuration to get index for. :type map_config: MapConfig :return: Index for the specified scan number and scan step index within the specified map configuration. :rtype: tuple """ index = () for independent_dimension in map_config.independent_dimensions: coordinate_index = list( map_config.coords[independent_dimension.label]).index( independent_dimension.get_value( self, scan_number, scan_step_index, map_config.scalar_data)) index = (coordinate_index, *index) return index
[docs] def get_detector_data(self, detectors, scan_number, scan_step_index): """Return the raw data from the specified detectors at the specified scan number and scan step index. :param detectors: Detector prefixes to get raw data for. :type detectors: list[str] :param scan_number: Scan number to get data for. :type scan_number: int :param scan_step_index: Scan step index to get data for. :type scan_step_index: int :return: Data from the specified detectors for the specified scan number and scan step index. :rtype: list[np.ndarray] """ return get_detector_data( tuple([detector.prefix for detector in detectors]), self.spec_file, scan_number, scan_step_index)
[docs] @cache def get_available_scan_numbers(spec_file): """Get the available scan numbers. :param spec_file: Path to the SPEC file. :type spec_file: str :return: Available scan numbers. :rtype: list[pyspec.file.spec.FileSpec] """ return list(FileSpec(spec_file).scans.keys())
[docs] @cache def get_scanparser(spec_file, scan_number, par_file=None): """Get the scanparser. :param spec_file: Path to the SPEC file. :type spec_file: str :param scan_number: Scan number to get data for. :type scan_number: int :param par_file: Path to a SMB-style par file. :type par_file: str, optional :return: `ScanParser` for the requested scan. :rtype: list[chess_scanparsers.ScanParser] """ # pylint: disable=undefined-variable if scan_number not in get_available_scan_numbers(spec_file): return None if par_file is None: return ScanParser(spec_file, scan_number) return ScanParser(spec_file, scan_number, par_file=par_file)
[docs] @lru_cache(maxsize=10) def get_detector_data( detector_prefixes, spec_file, scan_number, scan_step_index): """Get the detector data. :param detector_prefixes: Detector prefixes. :type detector_prefixes: tuple[str] or list[str] :param spec_file: Path to the SPEC file. :type spec_file: str :param scan_number: Scan number to get data for. :type scan_number: int :param scan_step_index: Scan step index. :type scan_step_index: int :return: Detector data. :rtype: list[numpy.ndarray] """ detector_data = [] scanparser = get_scanparser(spec_file, scan_number) for prefix in detector_prefixes: image_data = scanparser.get_detector_data(prefix, scan_step_index) detector_data.append(image_data) return detector_data
[docs] class PointByPointScanData(CHAPBaseModel): """Class representing a source of raw scalar-valued data for which a value was recorded at every point in a :class:`~CHAP.common.models.map.MapConfig`. :ivar label: User-defined label for referring to this data in the NeXus file and in other tools. :vartype label: str :ivar units: Units in which the data were recorded. :vartype units: str :ivar data_type: Represents how these data were recorded at time of data collection. :vartype data_type: Literal[ 'expression', 'detector_log_timestamps', 'scan_column', 'scan_start_time', 'scan_step_index', 'smb_par', 'spec_motor', 'spec_motor_absolute', 'spec_motor_static'] :ivar name: Represents the name with which these raw data were recorded at time of data collection. :vartype name: str :ivar ndigits: Round SPEC motor values to the specified number of decimals if set. :vartype ndigits: int, optional """ label: constr(min_length=1) units: constr(strip_whitespace=True, min_length=1) data_type: Literal[ 'expression', 'detector_log_timestamps', 'scan_column', 'scan_start_time', 'scan_step_index', 'smb_par', 'spec_motor', 'spec_motor_absolute', 'spec_motor_static'] name: constr(strip_whitespace=True, min_length=1) ndigits: Optional[conint(ge=0)] = None
[docs] @field_validator('label') @classmethod def validate_label(cls, label): """Validate that the supplied `label` does not conflict with any of the values for `label` reserved for certain data needed to perform corrections. :param label: Input value of `label`. :type label: str :raises ValueError: If `label` is one of the reserved values. :return: Originally supplied value `label`. :rtype: str """ if ((not issubclass(cls,CorrectionsData)) and label in CorrectionsData.reserved_labels()): raise ValueError( f'{cls.__class__.__name__}.label may not be any of the ' 'following reserved values: ' f'{CorrectionsData.reserved_labels()}') return label
[docs] def validate_for_station(self, station): """Validate this instance of :class:`~CHAP.common.models.map.PointByPointScanData` for a certain choice of station (beamline). :param station: Name of the station at which the data was collected. :type station: Literal['id1a3', 'id3a', 'id3b', 'id4b'] :param experiment_type: Experiment type. :type experiment_type: Literal[ 'EDD', 'GIWAXS', 'HDRM', 'SAXSWAXS', 'TOMO', 'XRF'] :raises TypeError: If the station is not compatible with the value of the `data_type` attribute for this instance of :class:`~CHAP.common.models.map.PointByPointScanData`. """ if (station.lower() not in ('id1a3', 'id3a') and self.data_type == 'smb_par'): raise TypeError( f'{self.__class__.__name__}.data_type may not be "smb_par" ' f'when station is "{station}"') if (not station.lower() == 'id3b' and self.data_type == 'detector_log_timestamps'): raise TypeError( f'{self.__class__.__name__}.data_type may not be' + f' "detector_log_timestamps" when station is "{station}"')
[docs] def validate_for_spec_scans( self, spec_scans, scan_step_index='all'): """Validate this instance of :class:`~CHAP.common.models.map.PointByPointScanData` for a list of :class:`~CHAP.common.models.map.SpecScans`. :param spec_scans: List of :class:`~CHAP.common.models.map.SpecScans`'s whose raw data will be checked for the presence of the data represented by this instance of :class:`~CHAP.common.models.map.PointByPointScanData`. :type spec_scans: list[SpecScans] :param scan_step_index: Specific scan step index to validate, defaults to `'all'`. :type scan_step_index: int or Literal['all'], optional :raises RuntimeError: If the data represented by this instance of :class:`~CHAP.common.models.map.PointByPointScanData` is missing for the specified scan steps. """ for scans in spec_scans: for scan_number in scans.scan_numbers: scanparser = scans.get_scanparser(scan_number) if scan_step_index == 'all': scan_step_index_range = range(scanparser.spec_scan_npts) else: scan_step_index_range = range( scan_step_index, 1+scan_step_index) for index in scan_step_index_range: try: self.get_value(scans, scan_number, index) except Exception as exc: raise RuntimeError( f'Could not find data for {self.name} ' f'(data_type "{self.data_type}") ' f'on scan number {scan_number} ' f'for index {index} ' f'in spec file {scans.spec_file}') from exc
[docs] def validate_for_scalar_data(self, scalar_data): """Used for :class:`~CHAP.common.models.map.PointByPointScanData` objects with a `data_type` of `'expression'`. Validate that the `scalar_data` field of ai :class:`~CHAP.common.models.map.MapConfig` object contains all the items necessary for evaluating the expression. :param scalar_data: The `scalar_data` field of a :class:`~CHAP.common.models.map.MapConfig` that this :class:`~CHAP.common.models.map.PointByPointScanData` object will be validated against. :type scalar_data: list[PointByPointScanData] :raises ValueError: if `scalar_data` does not contain items needed for evaluating the expression. """ # Third party modules from ast import parse from asteval import get_ast_names labels = get_ast_names(parse(self.name)) for label in ('round', 'np', 'numpy'): try: labels.remove(label) except Exception: pass for l in labels: if l in ('round', 'np', 'numpy'): continue label_found = False for s_d in scalar_data: if s_d.label == l: label_found = True break if not label_found: raise ValueError( f'{l} is not the label of an item in scalar_data')
[docs] def get_value( self, spec_scans, scan_number, scan_step_index=0, scalar_data=None, relative=True, static=False, ndigits=None): """Return the value recorded for this instance of :class:`~CHAP.common.models.map.PointByPointScanData` at a specific scan step. :param spec_scans: An instance of :class:`~CHAP.common.models.map.SpecScans` in which the requested scan step occurs. :type spec_scans: SpecScans :param scan_number: Number of the scan in which the requested scan step occurs. :type scan_number: int :param scan_step_index: Index of the requested scan step, defaults to `0`. :type scan_step_index: int, optional :param scalar_data: Scalar data configurations used to get values for :class:`~CHAP.common.models.map.PointByPointScanData` objects with `data_type == 'expression'`. :type scalar_data: list[PointByPointScanData], optional :param relative: Whether to return a relative value or not, defaults to `True` (only applies to SPEC motor values). :type relative: bool, optional :params ndigits: Round SPEC motor values to the specified number of decimals if set. :type ndigits: int, optional :param static: Wether to return just a static motor postion even if the motor is scanned (in which case: return the first position of the motor in the scan), defaults to `False`. :type static: bool, optional :return: Value recorded of the data represented by this instance of :class:`~CHAP.common.models.map.PointByPointScanData` at the scan step requested. :rtype: float """ if 'spec_motor' in self.data_type: if ndigits is None: ndigits = self.ndigits if self.data_type.endswith('absolute'): relative = False if self.data_type.endswith('static'): static = True relative = False return get_spec_motor_value( spec_scans.spec_file, scan_number, scan_step_index, self.name, relative, static, ndigits) if self.data_type == 'scan_column': return get_spec_counter_value( spec_scans.spec_file, scan_number, scan_step_index, self.name) if self.data_type == 'scan_start_time': start_time = get_scan_start_time(spec_scans.spec_file, scan_number) if scan_step_index < 0: scanparser = get_scanparser(spec_scans.spec_file, scan_number) return np.array([start_time] * scanparser.spec_scan_npts) return start_time if self.data_type == 'smb_par': return get_smb_par_value( spec_scans.spec_file, scan_number, self.name) if self.data_type == 'expression': if scalar_data is None: scalar_data = [] return get_expression_value( spec_scans, scan_number, scan_step_index, self.name, scalar_data) if self.data_type == 'detector_log_timestamps': timestamps = get_detector_log_timestamps( spec_scans.spec_file, scan_number, self.name) if scan_step_index >= 0: return timestamps[scan_step_index] return timestamps if self.data_type == 'scan_step_index': if scan_step_index >= 0: return scan_step_index scanparser = get_scanparser(spec_scans.spec_file, scan_number) return [i for i in range(scanparser.spec_scan_npts)] return None
[docs] @cache def get_spec_motor_value( spec_file, scan_number, scan_step_index, spec_mnemonic, relative=True, static=False, ndigits=None): """Return the value recorded for a SPEC motor at a specific scan step. :param spec_file: Location of a SPEC file in which the requested scan step occurs. :type spec_scans: str :param scan_number: Number of the scan in which the requested scan step occurs. :type scan_number: int :param scan_step_index: Requested scan step index. :type scan_step_index: int :param spec_mnemonic: SPEC motor mnemonic. :type spec_mnemonic: str :param relative: Whether to return a relative value or not, defaults to `True`. :type relative: bool, optional :param static: Wether to return just a static motor postion even if the motor is scanned (in which case: return the first position of the motor in the scan); defaults to `False`. :type static: bool, optional :params ndigits: Round SPEC motor values to the specified number of decimals if set. :type ndigits: int, optional :return: Value of the motor at the scan step requested. :rtype: float """ scanparser = get_scanparser(spec_file, scan_number) if (hasattr(scanparser, 'spec_scan_motor_mnes') and spec_mnemonic in scanparser.spec_scan_motor_mnes): motor_i = scanparser.spec_scan_motor_mnes.index(spec_mnemonic) if scan_step_index >= 0: scan_step = np.unravel_index( scan_step_index, scanparser.spec_scan_shape, order='F') if static: motor_value = scanparser.get_spec_scan_motor_vals( relative)[motor_i][0] else: motor_value = \ scanparser.get_spec_scan_motor_vals( relative)[motor_i][scan_step[motor_i]] else: motor_value = scanparser.get_spec_scan_motor_vals( relative)[motor_i] if static: motor_value = [motor_value[0]] * scanparser.spec_scan_npts else: if len(scanparser.spec_scan_shape) == 2: if motor_i == 0: motor_value = np.concatenate( [motor_value] * scanparser.spec_scan_shape[1]) else: motor_value = np.repeat( motor_value, scanparser.spec_scan_shape[0]) else: motor_value = scanparser.get_spec_positioner_value(spec_mnemonic) if ndigits is not None: motor_value = np.round(motor_value, ndigits) return motor_value
[docs] @cache def get_spec_counter_value( spec_file, scan_number, scan_step_index, spec_column_label): """Return the value recorded for a SPEC counter at a specific scan step. :param spec_file: Location of a SPEC file in which the requested scan step occurs. :type spec_scans: str :param scan_number: Number of the scan in which the requested scan step occurs. :type scan_number: int :param scan_step_index: Requested scan step index. :type scan_step_index: int :param spec_column_label: SPEC data column label. :type spec_column_label: str :return: Value of the counter at the scan step requested. :rtype: float """ scanparser = get_scanparser(spec_file, scan_number) if scan_step_index >= 0: return scanparser.spec_scan_data[spec_column_label][scan_step_index] return scanparser.spec_scan_data[spec_column_label]
[docs] @cache def get_smb_par_value(spec_file, scan_number, par_name): """Return the value recorded for a specific scan in SMB-tyle par file. :param spec_file: Location of a SPEC file in which the requested scan step occurs. :type spec_scans: str :param scan_number: Number of the scan in which the requested scan step occurs. :type scan_number: int :param par_name: Name of the column in the par file. :type par_name: str :return: Value of the par file value for the scan requested. :rtype: float """ scanparser = get_scanparser(spec_file, scan_number) return scanparser.pars[par_name]
[docs] @cache def get_scan_start_time(spec_file, scan_number): """Return the start time of the indicated spec scan as the unix epoch (in seconds). :param spec_file: SPEC file location. :type spec_file: str :param scan_number: Scan number. :returns: Epoch at which the scan began. :rtype: int """ # System modules import datetime import zoneinfo scan = get_scanparser(spec_file, scan_number).spec_scan start_time = datetime.datetime.strptime(scan.date, '%c') start_time = start_time.replace( tzinfo=zoneinfo.ZoneInfo('America/New_York') ) start_time = start_time.astimezone(tz=datetime.timezone.utc) return start_time.timestamp()
[docs] def get_expression_value( spec_scans, scan_number, scan_step_index, expression, scalar_data): """Return the value of an evaluated expression of other sources of point-by-point scalar scan data for a single point. :param spec_scans: Instance of :class:`~CHAP.common.models.map.SpecScans` in which the requested scan step occurs. :type spec_scans: SpecScans :param scan_number: Number of the scan in which the requested scan step occurs. :type scan_number: int :param scan_step_index: Requested scan step index. :type scan_step_index: int :param expression: String expression to evaluate. :type expression: str :param scalar_data: `scalar_data` field of a :class:`~CHAP.common.models.map.MapConfig` object (used to provide values for variables used in `expression`). :type scalar_data: list[PointByPointScanData] :return: Par file value for the requested scan. :rtype: float """ # Third party modules from ast import parse from asteval import get_ast_names, Interpreter labels = get_ast_names(parse(expression)) symtable = {} for l in labels: if l == 'round': symtable[l] = round for s_d in scalar_data: if s_d.label == l: symtable[l] = s_d.get_value( spec_scans, scan_number, scan_step_index, scalar_data) aeval = Interpreter(symtable=symtable) return aeval(expression)
[docs] @cache def get_detector_log_timestamps(spec_file, scan_number, detector_prefix): """Return the list of detector timestamps for the given scan and detector prefix. :param spec_file: Location of a SPEC file in which the requested scan occurs. :type spec_scans: str :param scan_number: Number of the scan for which to return detector log timestamps. :type scan_number: int :param detector_prefix: Prefix of the detector whose log file should be used. :return: All detector log timestamps for the given scan. :rtype: list[float] """ sp = get_scanparser(spec_file, scan_number) return sp.get_detector_log_timestamps(detector_prefix)
[docs] def validate_data_source_for_map_config(data_source, info): """Confirm that an instance of :class:`~CHAP.common.models.map.PointByPointScanData` is valid for the station and scans provided by a map configuration dictionary. :param data_source: Input object to validate. :type data_source: PointByPointScanData :param info: Model parameter validation information. :type info: pydantic.ValidationInfo :raises Exception: If `data_source` cannot be validated. :return: Validated `data_source` instance. :rtype: PointByPointScanData """ def _validate_data_source_for_map_config(data_source, info): if isinstance(data_source, list): return [_validate_data_source_for_map_config(d_s, info) for d_s in data_source] if data_source is not None: values = info.data if data_source.data_type == 'expression': data_source.validate_for_scalar_data(values['scalar_data']) else: import_scanparser( values['station'], values['experiment_type']) data_source.validate_for_station(values['station']) if values['validate_data_present']: data_source.validate_for_spec_scans(values['spec_scans']) return data_source return _validate_data_source_for_map_config(data_source, info)
[docs] class IndependentDimension(PointByPointScanData): """Class representing the source of data to identify the coordinate values along one dimension of a :class:`~CHAP.common.models.map.MapConfig`. :ivar start: Starting index for slicing all datasets of a :class:`~CHAP.common.models.map.MapConfig` along this axis, defaults to `0`. :vartype start: int, optional :ivar end: Ending index for slicing all datasets of a :class:`~CHAP.common.models.map.MapConfig` along this axis, defaults to the total number of unique values along this axis in the associated :class:`~CHAP.common.models.map.MapConfig`. :vartype end: int, optional :ivar step: Step for slicing all datasets of a :class:`~CHAP.common.models.map.MapConfig` along this axis, defaults to `1`. :vartype step: int, optional """ # FIX convert to using CHAPSlice start: Optional[conint(ge=0)] = 0 end: Optional[int] = None step: Optional[conint(gt=0)] = 1
# @field_validator('step') # @classmethod # def validate_step(cls, step): # """Validate that the supplied value of `step`. # # :param step: `step` value to validate. # :type step: str # :raises ValueError: If `step` is zero. # :return: Validated `step` value. # :rtype: int # """ # if step == 0 : # raise ValueError('slice step cannot be zero') # return step
[docs] class CorrectionsData(PointByPointScanData): """Class representing the special instances of :class:`~CHAP.common.models.map.PointByPointScanData` that are used by certain kinds of `Correction` tools. :ivar label: One of the reserved values required by the `Correction` tool configurations. :vartype label: Literal['dwell_time_actual', 'postsample_intensity', 'presample_intensity'] :ivar data_type: Represents how these data were recorded at time of data collection. :vartype data_type: Literal['scan_column', 'smb_par'] """ label: Literal['dwell_time_actual', 'postsample_intensity', 'presample_intensity'] data_type: Literal['scan_column','smb_par']
[docs] @classmethod def reserved_labels(cls): """Return a list of all the labels reserved for corrections-related scalar data. :return: Reserved labels. :rtype: list[str] """ return list((*cls.model_fields['label'].annotation.__args__, 'round'))
[docs] class PresampleIntensity(CorrectionsData): """Class representing a source of raw data for the intensity of the beam that is incident on the sample. :ivar label: Must be `'presample_intensity"`. :vartype label: Literal['presample_intensity'] :ivar units: Must be `'counts'`. :vartype units: Literal['counts'] """ label: Literal['presample_intensity'] = 'presample_intensity' units: Literal['counts'] = 'counts'
[docs] class PostsampleIntensity(CorrectionsData): """Class representing a source of raw data for the intensity of the beam that has passed through the sample. :ivar label: Must be `'postsample_intensity'`. :vartype label: Literal['postsample_intensity'] :ivar units: Must be `'counts'`. :vartype units: Literal['counts'] """ label: Literal['postsample_intensity'] = 'postsample_intensity' units: Literal['counts'] = 'counts'
[docs] class DwellTimeActual(CorrectionsData): """Class representing a source of raw data for the actual dwell time at each scan point in SPEC (with some scan types, this value can vary slightly point-to-point from the dwell time specified in the command). :ivar label: Must be `'dwell_time_actual'`. :vartype label: Literal['dwell_time_actual'] :ivar units: Must be `'s'`. :vartype units: Literal['s'] """ label: Literal['dwell_time_actual'] = 'dwell_time_actual' units: Literal['s'] = 's'
[docs] class SpecConfig(CHAPBaseModel): """Class representing the raw data for one or more SPEC scans. :ivar station: Name of the station at which the data was collected. :vartype station: Literal['id1a3', 'id3a', 'id3b', 'id4b'] :ivar experiment_type: Experiment type. :vartype experiment_type: Literal[ 'EDD', 'GIWAXS', 'HDRM', 'SAXSWAXS', 'TOMO', 'XRF'] :ivar spec_scans: SPEC scans that compose the set. :vartype spec_scans: list[SpecScans] """ station: Literal['id1a3', 'id3a', 'id3b', 'id4b'] experiment_type: Literal[ 'EDD', 'GIWAXS', 'HDRM', 'SAXSWAXS', 'TOMO', 'XRF'] spec_scans: conlist(item_type=SpecScans, min_length=1)
[docs] @model_validator(mode='before') @classmethod def validate_specconfig_before(cls, data): """Ensure that a valid configuration was provided and finalize spec_file filepaths. :param data: `Pydantic <https://github.com/pydantic/pydantic>`__ validator data object. :type data: dict :return: Currently validated class attributes. :rtype: dict """ inputdir = data.get('inputdir') if inputdir is not None: spec_scans = data.get('spec_scans') for i, scans in enumerate(deepcopy(spec_scans)): if isinstance(scans, dict): spec_file = scans['spec_file'] if not os.path.isabs(spec_file): spec_scans[i]['spec_file'] = os.path.join( inputdir, spec_file) else: spec_file = scans.spec_file if not os.path.isabs(spec_file): spec_scans[i].spec_file = os.path.join( inputdir, spec_file) data['spec_scans'] = spec_scans return data
[docs] @field_validator('experiment_type') @classmethod def validate_experiment_type(cls, experiment_type, info): """Ensure values for the station and experiment_type fields are compatible. :param experiment_type: `experiment_type` value to validate. :type experiment_type: Literal[ 'EDD', 'GIWAXS', 'HDRM', 'SAXSWAXS', 'TOMO', 'XRF'] :param info: Model parameter validation information. :type info: pydantic.ValidationInfo :raises ValueError: Invalid experiment type. :return: Validated `experiment_type` value. :rtype: str """ station = info.data.get('station') if station == 'id1a3': allowed_experiment_types = ['EDD', 'SAXSWAXS', 'TOMO'] elif station == 'id3a': allowed_experiment_types = ['EDD', 'TOMO'] elif station == 'id3b': allowed_experiment_types = ['GIWAXS', 'SAXSWAXS', 'TOMO', 'XRF'] elif station == 'id4b': allowed_experiment_types = ['HDRM'] else: allowed_experiment_types = [] if experiment_type not in allowed_experiment_types: raise ValueError( f'For station {station}, allowed experiment types are ' f'{", ".join(allowed_experiment_types)}. ' f'Supplied experiment type {experiment_type} is not allowed.') import_scanparser(station, experiment_type) return experiment_type
[docs] class MapConfig(CHAPBaseModel): """Class representing an experiment consisting of one or more SPEC scans. :ivar did: `FOXDEN <https://github.com/CHESSComputing/FOXDEN>`__ data identifier. :vartype did: str, optional :ivar title: Map configuration title. :vartype title: str :ivar station: Name of the station at which the data was collected. :vartype station: Literal['id1a3', 'id3a', 'id3b', 'id4b'] :ivar experiment_type: Experiment type. :vartype experiment_type: Literal[ 'EDD', 'GIWAXS', 'HDRM', 'SAXSWAXS', 'TOMO', 'XRF'] :ivar sample: Sample metadata configuration. :vartype sample: CHAP.commom.models.map.Sample :ivar spec_scans: SPEC scans that compose the map. :vartype spec_scans: list[SpecScans] :ivar scalar_data: All sources of data representing other scalar raw data values collected at each point on the map. In the NeXus file representation of the map, datasets for these values will be included, defaults to `[]`. :vartype scalar_data: list[PointByPointScanData], optional :ivar independent_dimensions: Data sources representing the raw values of each independent dimension of the map. :vartype independent_dimensions: list[PointByPointScanData] :ivar presample_intensity: Source of point-by-point presample beam intensity data. Required when applying certain kinds of `Correction` tools. :vartype presample_intensity: PresampleIntensity, optional :ivar dwell_time_actual: Source of point-by-point actual dwell times for SPEC scans. Required when applying certain kinds of `Correction` tools. :vartype dwell_time_actual: DwellTimeActual, optional :ivar postsample_intensity: Source of point-by-point postsample beam intensity data. Required when applying certain kinds of `Correction` tools. :vartype postsample_intensity: PresampleIntensity, optional :ivar attrs: Additional map configuration configuration attributes. :vartype attrs: dict, optional """ validate_data_present: bool = True did: Optional[constr(strip_whitespace=True)] = None title: constr(strip_whitespace=True, min_length=1) station: Literal['id1a3', 'id3a', 'id3b', 'id4b'] experiment_type: Literal[ 'EDD', 'GIWAXS', 'HDRM', 'SAXSWAXS', 'TOMO', 'XRF'] sample: Sample spec_scans: conlist(item_type=SpecScans, min_length=1) scalar_data: Optional[conlist(item_type=PointByPointScanData)] = [] independent_dimensions: conlist( item_type=IndependentDimension, min_length=1) presample_intensity: Optional[PresampleIntensity] = None dwell_time_actual: Optional[DwellTimeActual] = None postsample_intensity: Optional[PostsampleIntensity] = None attrs: dict = {} # _coords: dict = PrivateAttr() _dims: tuple = PrivateAttr() # _scan_step_indices: list = PrivateAttr() # _shape: tuple = PrivateAttr() _validate_independent_dimensions = field_validator( 'independent_dimensions')(validate_data_source_for_map_config) _validate_presample_intensity = field_validator( 'presample_intensity')(validate_data_source_for_map_config) _validate_dwell_time_actual = field_validator( 'dwell_time_actual')(validate_data_source_for_map_config) _validate_postsample_intensity = field_validator( 'postsample_intensity')(validate_data_source_for_map_config) _validate_scalar_data = field_validator( 'scalar_data')(validate_data_source_for_map_config)
[docs] @model_validator(mode='before') @classmethod def validate_mapconfig_before(cls, data, info): """Ensure that a valid configuration was provided and finalize spec_file filepaths. :param data: `Pydantic <https://github.com/pydantic/pydantic>`__ validator data object. :type data: dict :return: Currently validated class attributes. :rtype: dict """ if 'spec_file' in data and 'scan_numbers' in data: spec_file = data.pop('spec_file') scan_numbers = data.pop('scan_numbers') if 'par_file' in data: par_file = data.pop('par_file') else: par_file = None if 'spec_scans' in data: raise ValueError( f'Ambiguous SPEC scan information: spec_file={spec_file},' f' scan_numbers={scan_numbers}, and ' f'spec_scans={data["spec_scans"]}') if par_file is None: data['spec_scans'] = [ {'spec_file': spec_file, 'scan_numbers': scan_numbers}] else: data['spec_scans'] = [ {'spec_file': spec_file, 'scan_numbers': scan_numbers, 'par_file': par_file}] else: spec_scans = data.get('spec_scans') if 'spec_scans' in data: inputdir = data.get('inputdir') if inputdir is None and info.data is not None: inputdir = info.data.get('inputdir') for i, scans in enumerate(deepcopy(spec_scans)): if isinstance(scans, SpecScans): scans = scans.model_dump() spec_file = scans['spec_file'] if inputdir is not None and not os.path.isabs(spec_file): scans['spec_file'] = os.path.join(inputdir, spec_file) spec_scans[i] = SpecScans(**scans, **data) data['spec_scans'] = spec_scans return data
[docs] @field_validator('experiment_type') @classmethod def validate_experiment_type(cls, experiment_type, info): """Ensure values for the station and experiment_type fields are compatible. :param experiment_type: `experiment_type` value to validate. :type experiment_type: Literal[ 'EDD', 'GIWAXS', 'HDRM', 'SAXSWAXS', 'TOMO', 'XRF'] :param info: Model parameter validation information. :type info: pydantic.ValidationInfo :raises ValueError: Invalid experiment type. :return: Validated `experiment_type` value. :rtype: str """ station = info.data['station'] if station == 'id1a3': allowed_experiment_types = ['EDD', 'SAXSWAXS', 'TOMO'] elif station == 'id3a': allowed_experiment_types = ['EDD', 'TOMO'] elif station == 'id3b': allowed_experiment_types = ['GIWAXS', 'SAXSWAXS', 'TOMO', 'XRF'] elif station == 'id4b': allowed_experiment_types = ['HDRM'] else: allowed_experiment_types = [] if experiment_type not in allowed_experiment_types: raise ValueError( f'For station {station}, allowed experiment types are ' f'{", ".join(allowed_experiment_types)}. ' f'Supplied experiment type {experiment_type} is not allowed.') return experiment_type
[docs] @model_validator(mode='before') @classmethod def validate_before(cls, data): """Ensure that the `attrs` parameter is initialized. :param data: `Pydantic <https://github.com/pydantic/pydantic>`__ validator data object. :type data: dict :return: Currently validated class attributes. :rtype: dict """ if data.get('attrs') is None: data['attrs'] = {} return data
#RV maybe better to use model_validator, see v2 docs?
[docs] @field_validator('attrs') @classmethod def validate_attrs(cls, attrs, info): """Validate any additional attributes depending on the values for the station and experiment_type fields. :param attrs: Any additional attributes to the :class:`~CHAP.common.models.map.MapConfig` :type attrs: dict :param info: Model parameter validation information. :type info: pydantic.ValidationInfo :raises ValueError: Invalid attribute. :return: Validated `attrs` fields. :rtype: dict """ # Get the map's scan_type for EDD experiments values = info.data station = values['station'] experiment_type = values['experiment_type'] if station in ['id1a3', 'id3a'] and experiment_type == 'EDD': scan_type = cls.get_smb_par_attr(values, 'scan_type') if scan_type is not None: attrs['scan_type'] = scan_type attrs['config_id'] = cls.get_smb_par_attr(values, 'config_id') dataset_id = cls.get_smb_par_attr( values, 'dataset_id', unique=False) if dataset_id is not None: attrs['dataset_id'] = dataset_id if attrs.get('scan_type') is None: return attrs axes_labels = {1: 'fly_labx', 2: 'fly_laby', 3: 'fly_labz', 4: 'fly_ometotal'} if attrs['scan_type'] != 0: attrs['fly_axis_labels'] = [ axes_labels[cls.get_smb_par_attr(values, 'fly_axis0')]] if attrs['scan_type'] in (2, 3, 5): attrs['fly_axis_labels'].append( axes_labels[cls.get_smb_par_attr(values, 'fly_axis1')]) return attrs
[docs] @staticmethod def get_smb_par_attr( class_fields, label, units='-', name=None, unique=True): """Read a SMB-style par file attribute. :param class_fields: Map configuration class fields. :type class_fields: Any :param label: Attribute's field key, the user-defined label for referring to this data in the NeXus file and in other tools. :type label: str :param units: Attribute's field unit, defaults to `'-'`. :type units: str :param name: Attribute's field name, the name with which these raw data were recorded at time of data collection, defaults to `label`. :type name: str, optional. """ if name is None: name = label PointByPointScanData( label=label, data_type='smb_par', units=units, name=name) values = [] for scans in class_fields.get('spec_scans'): for scan_number in scans.scan_numbers: scanparser = scans.get_scanparser(scan_number) try: values.append(scanparser.pars[name]) except Exception: # print( # f'Warning: No value found for par file value "{name}"' # f' on scan {scan_number} in spec file ' # f'{scans.spec_file}.') values.append(None) values = list(set(values)) if len(values) == 1: return values[0] if unique: raise ValueError( f'More than one {name} in map not allowed ({values})') return values
@property def all_scalar_data(self): """Return a list of all instances of :class:`~CHAP.common.models.map.PointByPointScanData` for which this map configuration will collect dataset-like data (as opposed to axes-like data). This will be any and all of the items in the corrections-data-related fields, as well as any additional items in the optional `scalar_data` field. :type: list """ return [getattr(self, label, None) for label in CorrectionsData.reserved_labels() if getattr(self, label, None) is not None] + self.scalar_data @property def coords(self): """Return a dictionary of the values of each independent dimension across the map. :type: list """ raise RuntimeError('property coords not implemented') if not hasattr(self, '_coords'): fly_axis_labels = self.attrs.get('fly_axis_labels', []) coords = {} for dim in self.independent_dimensions: if dim.label in fly_axis_labels: relative = True ndigits = 3 else: relative = False ndigits = None coords[dim.label] = [] for scans in self.spec_scans: for scan_number in scans.scan_numbers: scanparser = scans.get_scanparser(scan_number) for scan_step_index in range( scanparser.spec_scan_npts): coords[dim.label].append(dim.get_value( scans, scan_number, scan_step_index, self.scalar_data, relative, ndigits)) if self.map_type == 'structured': coords[dim.label] = np.unique(coords[dim.label]) self._coords = coords return self._coords @property def dims(self): """Return a list of the independent dimension labels for the map. :type: list """ if not hasattr(self, '_dims'): self._dims = [dim.label for dim in self.independent_dimensions] return self._dims @property def scan_step_indices(self): """Return an ordered list in which we can look up the :class:`~CHAP.common.models.map.SpecScans` object, the scan number, and scan step index for every point on the map. :type: list """ raise RuntimeError('property scan_step_indices not implemented') if not hasattr(self, '_scan_step_indices'): scan_step_indices = [] for scans in self.spec_scans: for scan_number in scans.scan_numbers: scanparser = scans.get_scanparser(scan_number) for scan_step_index in range(scanparser.spec_scan_npts): scan_step_indices.append( (scans, scan_number, scan_step_index)) self._scan_step_indices = scan_step_indices return self._scan_step_indices @property def shape(self): """Return the shape of the map -- a tuple representing the number of unique values of each dimension across the map. :type: tupple """ raise RuntimeError('property shape not implemented') if not hasattr(self, '_shape'): if self.map_type == 'structured': self._shape = tuple([len(v) for k, v in self.coords.items()]) else: self._shape = (len(self.scan_step_indices),) return self._shape
[docs] def get_coords(self, map_index): """Return a dictionary of the coordinate names and values of each independent dimension for a given point on the map. :param map_index: Map index to return coordinates for. :type map_index: tuple :return: Coordinate values. :rtype: dict """ raise RuntimeError('get_coords not implemented') if self.map_type == 'structured': scan_type = self.attrs.get('scan_type', -1) fly_axis_labels = self.attrs.get('fly_axis_labels', []) if (scan_type in (3, 5) and len(self.dims) == len(map_index) + len(fly_axis_labels)): dims = [dim for dim in self.dims if dim not in fly_axis_labels] return {dim:self.coords[dim][i] for dim, i in zip(dims, map_index)} return {dim:self.coords[dim][i] for dim, i in zip(self.dims, map_index)} return {dim:self.coords[dim][map_index[0]] for dim in self.dims}
[docs] def get_detector_data(self, detector_name, map_index): """Return detector data collected by this map for a given point on the map. :param detector_name: Name of the detector for which to return data. Usually the value of the detector's EPICS areaDetector prefix macro, $P. :type detector_name: str :param map_index: Map index to return detector data for. :type map_index: tuple :return: One frame of raw detector data. :rtype: np.ndarray """ raise RuntimeError('get_detector_data not implemented') scans, scan_number, scan_step_index = \ self.get_scan_step_index(map_index) scanparser = scans.get_scanparser(scan_number) return scanparser.get_detector_data(detector_name, scan_step_index)
[docs] def get_scan_step_index(self, map_index): """Return parameters to identify a single SPEC scan step that corresponds to the map point at the index provided. :param map_index: Map point index to identify as a specific SPEC scan step index. :type map_index: tuple :return: :class:`~CHAP.common.models.map.SpecScans` configuration, scan number, and scan step index. :rtype: tuple[SpecScans, int, int] """ raise RuntimeError('get_scan_step_index not implemented') fly_axis_labels = self.attrs.get('fly_axis_labels', []) if self.map_type == 'structured': map_coords = self.get_coords(map_index) for scans, scan_number, scan_step_index in self.scan_step_indices: coords = {dim.label:( dim.get_value( scans, scan_number, scan_step_index, self.scalar_data, True, 3) if dim.label in fly_axis_labels else dim.get_value( scans, scan_number, scan_step_index, self.scalar_data)) for dim in self.independent_dimensions} if coords == map_coords: return scans, scan_number, scan_step_index raise RuntimeError(f'Unable to match coordinates {coords}') return self.scan_step_indices[map_index[0]]
[docs] def get_value(self, data, map_index): """Return the raw data collected by a single device at a single point in the map. :param data: Device configuration to return a value of raw data for. :type data: PointByPointScanData :param map_index: Map index to return raw data for. :type map_index: tuple :return: Raw data value. :rtype: float """ raise RuntimeError('get_value not implemented') scans, scan_number, scan_step_index = \ self.get_scan_step_index(map_index) return data.get_value(scans, scan_number, scan_step_index, self.scalar_data)
[docs] def import_scanparser(station, experiment): """Given the name of a CHESS station and experiment type, import the corresponding subclass of `ScanParser` as `ScanParser`. :param station: Station name ('IDxx', not the beamline acronym). :type station: Literal['id1a3', 'id3a', 'id3b', 'id4b'] :param experiment: Experiment type. :type experiment: Literal[ 'EDD', 'GIWAXS', 'HDRM', 'SAXSWAXS', 'TOMO', 'XRF'] """ # Third party modules # pylint: disable=import-error from chess_scanparsers import choose_scanparser globals()['ScanParser'] = choose_scanparser(station, experiment)