#!/usr/bin/env python
#-*- coding: utf-8 -*-
"""Module for Readers unique to the EDD workflow."""
# System modules
import os
from typing import Optional
# Third party modules
# pylint: disable=import-error
from chess_scanparsers import SMBMCAScanParser as ScanParser
# pylint: enable=import-error
import numpy as np
from pydantic import (
conint,
conlist,
constr,
field_validator,
)
# Local modules
from CHAP.common.models.map import DetectorConfig
from CHAP.reader import Reader
[docs]
class EddMapReader(Reader):
"""Reader for taking an EDD-style par file and returning a
:class:`~CHAP.common.models.map.MapConfig` object representing one
of the datasets in the file. Independent dimensions are determined
automatically, and a specific set of items to use for extra scalar
datasets to include are hard-coded in. The raw data is read if
detector ID's are specified.
:ivar scan_numbers: List of scan numbers to use.
:vartype scan_numbers: int or list[int] or str, optional
:ivar dataset_id: Dataset ID value in the par file to return as a
map, defaults to `1`.
:vartype dataset_id: int, optional
"""
scan_numbers: Optional[
conlist(item_type=conint(gt=0), min_length=1)] = None
dataset_id: Optional[conint(ge=1)] = 1
[docs]
@field_validator('scan_numbers', mode='before')
@classmethod
def validate_scan_numbers(cls, scan_numbers):
"""Validate the specified list of scan numbers.
:param scan_numbers: Scan numbers.
:type scan_numbers: int or list[int] or str
:raises ValueError: If a specified scan number is not found in
the SPEC file.
:return: Scan numbers.
:rtype: list[int]
"""
if isinstance(scan_numbers, int):
scan_numbers = [scan_numbers]
elif isinstance(scan_numbers, str):
# Local modules
from CHAP.utils.general import string_to_list
scan_numbers = string_to_list(scan_numbers)
return scan_numbers
[docs]
def read(self):
"""Return a validated serialized
:class:`~CHAP.common.models.map.MapConfig` object representing
an EDD dataset.
:returns: Map configuration.
:rtype: dict
"""
# Local modules
from CHAP.common.models.map import MapConfig
from CHAP.utils.general import (
list_to_string,
)
from CHAP.utils.parfile import ParFile
parfile = ParFile(self.filename, scan_numbers=self.scan_numbers)
self.logger.debug(f'spec_file: {parfile.spec_file}')
attrs = {}
# Get list of scan numbers for the dataset
try:
dataset_ids = parfile.get_values('dataset_id')
dataset_rows = np.argwhere(np.where(
np.asarray(dataset_ids) == self.dataset_id, 1, 0)).flatten()
except (TypeError, ValueError):
dataset_rows = np.arange(len(parfile.scan_numbers))
attrs['dataset_id'] = 1
scan_nos = [parfile.data[i][parfile.scann_i] for i in dataset_rows
if parfile.data[i][parfile.scann_i] in
parfile.good_scan_numbers()]
if not scan_nos:
raise RuntimeError('Unable to find scans with dataset_id '
f'matching {self.dataset_id}')
self.logger.debug(f'Scan numbers: {list_to_string(scan_nos)}')
spec_scans = [
{'spec_file': parfile.spec_file, 'scan_numbers': scan_nos}]
# Get scan type for this dataset
try:
scan_types = parfile.get_values('scan_type', scan_numbers=scan_nos)
if any([st != scan_types[0] for st in scan_types]):
raise RuntimeError(
'Only one scan type per dataset is suported.')
scan_type = scan_types[0]
except ValueError as e:
# Third party modules
# pylint: disable=import-error
from chess_scanparsers import SMBScanParser
scanparser = SMBScanParser(parfile.spec_file, scan_nos[0])
if scanparser.spec_macro == 'tseries':
scan_type = 0
else:
raise RuntimeError('Old style par files not supported for '
'spec_macro != tseries') from e
attrs['scan_type'] = scan_type
self.logger.debug(f'Scan type: {scan_type}')
# Based on scan type, get independent_dimensions for the map
# Start by adding labx, laby, labz, and omega. Any "extra"
# dimensions will be sqeezed out of the map later.
independent_dimensions = [
{'label': 'labx', 'units': 'mm', 'data_type': 'smb_par',
'name': 'labx'},
{'label': 'laby', 'units': 'mm', 'data_type': 'smb_par',
'name': 'laby'},
{'label': 'labz', 'units': 'mm', 'data_type': 'smb_par',
'name': 'labz'},
{'label': 'ometotal', 'units': 'degrees',
'data_type': 'smb_par', 'name': 'ometotal'},
]
scalar_data = []
if scan_type != 0:
self.logger.warning(
'Assuming all fly axes parameters are identical for all scans')
attrs['fly_axis_labels'] = []
axes_labels = {
1: 'fly_labx', 2: 'fly_laby', 3: 'fly_labz', 4: 'fly_ometotal'}
axes_units = {1: 'mm', 2: 'mm', 3: 'mm', 4: 'degrees'}
axes_added = []
scanparser = ScanParser(parfile.spec_file, scan_nos[0])
def add_fly_axis(fly_axis_index):
"""Add the fly axis info."""
if fly_axis_index in axes_added:
return
fly_axis_key = scanparser.pars[f'fly_axis{fly_axis_index}']
independent_dimensions.append({
'label': axes_labels[fly_axis_key],
'data_type': 'spec_motor',
'units': axes_units[fly_axis_key],
'name': scanparser.spec_scan_motor_mnes[fly_axis_index],
})
axes_added.append(fly_axis_index)
attrs['fly_axis_labels'].append(axes_labels[fly_axis_key])
add_fly_axis(0)
if scan_type in (2, 3, 5):
add_fly_axis(1)
if scan_type == 5:
scalar_data.append({
'label': 'bin_axis', 'units': 'n/a',
'data_type': 'smb_par', 'name': 'bin_axis',
})
attrs['bin_axis_label'] = axes_labels[
scanparser.pars['bin_axis']].replace('fly_', '')
# Add in the usual extra scalar data maps for EDD
scalar_data.append({
'label': 'SCAN_N', 'units': 'n/a', 'data_type': 'smb_par',
'name': 'SCAN_N',
})
if 'rsgap_size' in parfile.column_names:
scalar_data.append({
'label': 'rsgap_size', 'units': 'mm',
'data_type': 'smb_par', 'name': 'rsgap_size',
})
if 'x_effective' in parfile.column_names:
scalar_data.append({
'label': 'x_effective', 'units': 'mm',
'data_type': 'smb_par', 'name': 'x_effective',
})
if 'z_effective' in parfile.column_names:
scalar_data.append({
'label': 'z_effective', 'units': 'mm',
'data_type': 'smb_par', 'name': 'z_effective',
})
# Construct and validate the initial map config dictionary
scanparser = ScanParser(parfile.spec_file, scan_nos[0])
map_config_dict = {
'title': f'{scanparser.scan_name}_dataset{self.dataset_id}',
'station': 'id1a3',
'experiment_type': 'EDD',
'sample': {'name': scanparser.scan_name},
'spec_scans': spec_scans,
'independent_dimensions': independent_dimensions,
'scalar_data': scalar_data,
'presample_intensity': {
'name': 'a3ic1',
'data_type': 'scan_column'},
'postsample_intensity': {
'name': 'diode',
'data_type': 'scan_column'},
'dwell_time_actual': {
'name': 'sec',
'data_type': 'scan_column'},
'attrs': attrs,
}
map_config_dict = MapConfig(**map_config_dict)
# Add lab coordinates to the map's scalar_data only if they
# are NOT already one of the sqeezed map's
# independent_dimensions.
lab_dims = [
{'label': 'labx', 'units': 'mm', 'data_type': 'smb_par',
'name': 'labx'},
{'label': 'laby', 'units': 'mm', 'data_type': 'smb_par',
'name': 'laby'},
{'label': 'labz', 'units': 'mm', 'data_type': 'smb_par',
'name': 'labz'},
{'label': 'ometotal', 'units': 'degrees',
'data_type': 'smb_par', 'name': 'ometotal'},
]
for dim in lab_dims:
if dim not in independent_dimensions:
scalar_data.append(dim)
# Convert list of scan_numbers to string notation
scan_numbers = map_config_dict.spec_scans[0].scan_numbers
map_config_dict.spec_scans[0].scan_numbers = list_to_string(
scan_numbers)
return map_config_dict.model_dump()
[docs]
class EddMPIMapReader(Reader):
"""Reader for taking an EDD-style par file and returning a
:class:`~CHAP.common.models.map.MapConfig` object representing one
of the datasets in the file. Independent dimensions are determined
automatically, and a specific set of items to use for extra scalar
datasets to include are hard-coded in. The raw data is read if
detector ID's are specified.
:ivar dataset_id: Dataset ID value in the par file to return as a
map, defaults to `1`.
:vartype dataset_id: int, optional
:ivar detector_ids: Detector IDs for the raw data.
:vartype detector_ids: int or list[int] or str
"""
dataset_id: Optional[conint(ge=1)] = 1
detector_ids: conlist(item_type=conint(gt=0), min_length=1)
[docs]
@field_validator('detector_ids', mode='before')
@classmethod
def validate_detector_ids(cls, detector_ids):
"""Validate the specified list of detector IDs.
:param detector_ids: Detector IDs.
:type detector_ids: int or list[int] or str
:return: List of Detector IDs.
:rtype: list[int]
"""
if isinstance(detector_ids, int):
detector_ids = [detector_ids]
elif isinstance(detector_ids, str):
# Local modules
from CHAP.utils.general import string_to_list
detector_ids = string_to_list(detector_ids)
return detector_ids
[docs]
def read(self):
"""Return a validated serialized
:class:`~CHAP.common.models.map.MapConfig` object representing
an EDD dataset.
:returns: Map configuration.
:rtype: dict
"""
# Third party modules
# pylint: disable=no-name-in-module
from nexusformat.nexus import (
NXcollection,
NXdata,
NXentry,
NXfield,
NXsample,
)
# pylint: enable=no-name-in-module
# Local modules
from CHAP.common.models.map import MapConfig
from CHAP.utils.parfile import ParFile
parfile = ParFile(self.filename)
self.logger.debug(f'spec_file: {parfile.spec_file}')
# Get list of scan numbers for the dataset
dataset_ids = np.asarray(parfile.get_values('dataset_id'))
dataset_rows = np.argwhere(np.where(
np.asarray(dataset_ids) == self.dataset_id, 1, 0)).flatten()
scan_nos = [parfile.data[i][parfile.scann_i] for i in dataset_rows
if parfile.data[i][parfile.scann_i] in
parfile.good_scan_numbers()]
self.logger.debug(f'Scan numbers: {scan_nos}')
spec_scans = [
{'spec_file': parfile.spec_file, 'scan_numbers': scan_nos}]
# Get scan type for this dataset
scan_types = parfile.get_values('scan_type', scan_numbers=scan_nos)
if any([st != scan_types[0] for st in scan_types]):
msg = 'Only one scan type per dataset is suported.'
self.logger.error(msg)
raise ValueError(msg)
scan_type = scan_types[0]
self.logger.debug(f'Scan type: {scan_type}')
# Based on scan type, get independent_dimensions for the map
# Start by adding labx, laby, labz, and omega. Any "extra"
# dimensions will be sqeezed out of the map later.
independent_dimensions = [
{'label': 'labx', 'units': 'mm', 'data_type': 'smb_par',
'name': 'labx'},
{'label': 'laby', 'units': 'mm', 'data_type': 'smb_par',
'name': 'laby'},
{'label': 'labz', 'units': 'mm', 'data_type': 'smb_par',
'name': 'labz'},
{'label': 'ometotal', 'units': 'degrees',
'data_type': 'smb_par', 'name': 'ometotal'},
]
scalar_data = []
attrs = {}
if scan_type != 0:
self.logger.warning(
'Assuming all fly axes parameters are identical for all scans')
attrs['fly_axis_labels'] = []
axes_labels = {
1: 'fly_labx', 2: 'fly_laby', 3: 'fly_labz', 4: 'fly_ometotal'}
axes_units = {1: 'mm', 2: 'mm', 3: 'mm', 4: 'degrees'}
axes_added = []
scanparser = ScanParser(parfile.spec_file, scan_nos[0])
def add_fly_axis(fly_axis_index):
"""Add the fly axis info."""
if fly_axis_index in axes_added:
return
fly_axis_key = scanparser.pars[f'fly_axis{fly_axis_index}']
independent_dimensions.append({
'label': axes_labels[fly_axis_key],
'data_type': 'spec_motor',
'units': axes_units[fly_axis_key],
'name': scanparser.spec_scan_motor_mnes[fly_axis_index],
})
axes_added.append(fly_axis_index)
attrs['fly_axis_labels'].append(axes_labels[fly_axis_key])
add_fly_axis(0)
if scan_type in (2, 3, 5):
add_fly_axis(1)
if scan_type == 5:
scalar_data.append({
'label': 'bin_axis', 'units': 'n/a',
'data_type': 'smb_par', 'name': 'bin_axis',
})
attrs['bin_axis_label'] = axes_labels[
scanparser.pars['bin_axis']].replace('fly_', '')
# Add in the usual extra scalar data maps for EDD
scalar_data.extend([
{'label': 'SCAN_N', 'units': 'n/a', 'data_type': 'smb_par',
'name': 'SCAN_N'},
{'label': 'rsgap_size', 'units': 'mm',
'data_type': 'smb_par', 'name': 'rsgap_size'},
{'label': 'x_effective', 'units': 'mm',
'data_type': 'smb_par', 'name': 'x_effective'},
{'label': 'z_effective', 'units': 'mm',
'data_type': 'smb_par', 'name': 'z_effective'},
])
# Construct and validate the initial map config dictionary
scanparser = ScanParser(parfile.spec_file, scan_nos[0])
map_config_dict = {
'title': f'{scanparser.scan_name}_dataset{self.dataset_id}',
'station': 'id1a3',
'experiment_type': 'EDD',
'sample': {'name': scanparser.scan_name},
'spec_scans': spec_scans,
'independent_dimensions': independent_dimensions,
'scalar_data': scalar_data,
'presample_intensity': {
'name': 'a3ic1',
'data_type': 'scan_column'},
'postsample_intensity': {
'name': 'diode',
'data_type': 'scan_column'},
'dwell_time_actual': {
'name': 'sec',
'data_type': 'scan_column'},
'attrs': attrs,
}
map_config = MapConfig(**map_config_dict)
# Squeeze out extraneous independent dimensions (dimensions
# along which data were taken at only one unique coordinate
# value)
while 1 in map_config.shape:
remove_dim_index = map_config.shape.index(1)
self.logger.debug(
'Map dimensions: '
+ str([dim["label"] for dim in independent_dimensions]))
self.logger.debug(f'Map shape: {map_config.shape}')
self.logger.debug(
'Sqeezing out independent dimension '
f'{independent_dimensions[remove_dim_index]["label"]}')
independent_dimensions.pop(remove_dim_index)
map_config = MapConfig(**map_config_dict)
self.logger.debug(
'Map dimensions: '
+ str([dim["label"] for dim in independent_dimensions]))
self.logger.debug(f'Map shape: {map_config.shape}')
# Add lab coordinates to the map's scalar_data only if they
# are NOT already one of the sqeezed map's
# independent_dimensions.
lab_dims = [
{'label': 'labx', 'units': 'mm', 'data_type': 'smb_par',
'name': 'labx'},
{'label': 'laby', 'units': 'mm', 'data_type': 'smb_par',
'name': 'laby'},
{'label': 'labz', 'units': 'mm', 'data_type': 'smb_par',
'name': 'labz'},
{'label': 'ometotal', 'units': 'degrees',
'data_type': 'smb_par', 'name': 'ometotal'},
]
for dim in lab_dims:
if dim not in independent_dimensions:
scalar_data.append(dim)
# Set up NXentry and add misc. CHESS-specific metadata
nxentry = NXentry(name=map_config.title)
nxentry.attrs['station'] = map_config.station
nxentry.map_config = map_config.model_dump_json()
nxentry.spec_scans = NXcollection()
for scans in map_config.spec_scans:
nxentry.spec_scans[scans.scanparsers[0].scan_name] = NXfield(
value=scans.scan_numbers,
attrs={'spec_file': str(scans.spec_file)})
# Add sample metadata
nxentry[map_config.sample.name] = NXsample(
**map_config.sample.model_dump())
# Set up default data group
nxentry.data = NXdata()
independent_dimensions = map_config.independent_dimensions
for dim in independent_dimensions:
nxentry.data[dim.label] = NXfield(
units=dim.units,
attrs={'long_name': f'{dim.label} ({dim.units})',
'data_type': dim.data_type,
'local_name': dim.name})
# Read the raw data and independent dimensions
data = [[] for _ in self.detector_ids]
dims = [[] for _ in independent_dimensions]
for scans in map_config.spec_scans:
for scan_number in scans.scan_numbers:
scanparser = scans.get_scanparser(scan_number)
for i, detector_id in enumerate(self.detector_ids):
ddata = scanparser.get_detector_data(detector_id)
data[i].append(ddata)
for i, dim in enumerate(independent_dimensions):
dims[i].append(dim.get_value(
scans, scan_number, scan_step_index=-1, relative=True))
return map_config_dict
[docs]
class ScanToMapReader(Reader):
"""Reader for turning a single SPEC scan into a
:class:`~CHAP.common.models.map.MapConfig` object.
:ivar scan_number: SPEC scan number.
:vartype scan_number: int
"""
scan_number: conint(ge=0)
[docs]
def read(self):
"""Return a validated serialized
:class:`~CHAP.common.models.map.MapConfig` object representing
an EDD dataset.
:returns: Map configuration.
:rtype: dict
"""
scanparser = ScanParser(self.filename, self.scan_number)
if (scanparser.spec_macro in ('tseries', 'loopscan') or
(scanparser.spec_macro == 'flyscan' and
not len(scanparser.spec_args) == 5)):
independent_dimensions = [{
'label': 'Time', 'units': 'seconds',
'data_type': 'scan_column', 'name': 'Time',
}]
else:
independent_dimensions = [
{'label': mne, 'units': 'unknown units',
'data_type': 'spec_motor', 'name': mne}
for mne in scanparser.spec_scan_motor_mnes]
map_config_dict = {
'title': f'{scanparser.scan_name}_{self.scan_number:03d}',
'station': 'id1a3',
'experiment_type': 'EDD',
'sample': {'name': scanparser.scan_name},
'spec_scans': [{
'spec_file': self.filename,
'scan_numbers': [self.scan_number]}],
'independent_dimensions': independent_dimensions,
'presample_intensity': {
'name': 'a3ic1',
'data_type': 'scan_column'},
'postsample_intensity': {
'name': 'diode',
'data_type': 'scan_column'},
'dwell_time_actual': {
'name': 'sec',
'data_type': 'scan_column'},
}
return map_config_dict
[docs]
class SetupNXdataReader(Reader):
"""Reader for converting the SPEC input txt file for EDD dataset
collection to an approporiate input argument for
:class:`~CHAP.common.processor.SetupNXdataProcessor`.
:ivar dataset_id: Dataset ID value in the txt file to return
:meth:`~CHAP.common.SetupNXdataProcessor.process` arguments
for.
:vartype dataset_id: int
:ivar detectors: Detector list.
:vartype detectors: DetectorConfig
"""
dataset_id: conint(ge=1)
detectors: DetectorConfig
[docs]
@field_validator('detectors', mode='before')
@classmethod
def validate_detectors(cls, detectors):
"""Validate the specified list of detectors.
:param detectors: Detectors list.
:type detectors: list[CHAP.common.models.map.Detector]
:return: Detectors list.
:rtype: list[CHAP.common.models.map.Detector]
"""
if detectors is None:
detectors = [{'id': i} for i in range(23)]
return DetectorConfig(detectors=detectors)
[docs]
def read(self):
"""Return a dictionary containing the `coords`, `signals`, and
`attrs` arguments appropriate for use with
:meth:`~CHAP.common.processor.SetupNXdataProcessor.process` to
set up an initial NeXus style
`NXdata <https://manual.nexusformat.org/classes/base_classes/NXdata.html#index-0>`__
object representing a complete and organized structured EDD
dataset.
:returns: Dataset's coordinate names, values, attributes,
and signal names, shapes, and attributes.
:rtype: dict
"""
# Columns in input txt file:
# 0: scan number
# 1: dataset index
# 2: configuration descriptor
# 3: labx
# 4: laby
# 5: labz
# 6: omega (reference)
# 7: omega (offset)
# 8: dwell time
# 9: beam width
# 10: beam height
# 11: detector slit gap width
# 12: scan type
# Following columns used only for scan types 1 and up and
# specify flyscan/flymesh parameters.
# 13 + 4n: scan direction axis index
# 14 + 4n: lower bound
# 15 + 4n: upper bound
# 16 + 4n: no. points
# (For scan types 1, 4: n = 0)
# (For scan types 2, 3, 5: n = 0 or 1)
# For scan type 5 only:
# 21: bin axis
# Parse dataset from the input txt file.
with open(self.filename, 'r', encoding='utf-8') as f:
file_lines = f.readlines()
dataset_lines = []
for l in file_lines:
vals = l.split()
for i, v in enumerate(vals):
try:
vals[i] = int(v)
except ValueError:
try:
vals[i] = float(v)
except ValueError:
pass
if vals[1] == self.dataset_id:
dataset_lines.append(vals)
# Start inferring coords and signals lists for EDD experiments
self.logger.warning(
'Assuming the following parameters are identical across the '
'entire dataset: scan type, configuration descriptor')
scan_type = dataset_lines[0][12]
self.logger.debug(f'scan_type = {scan_type}')
# Set up even the potential "coordinates" (labx, laby, labz,
# ometotal) as "signals" because we want to force
# common.SetupNXdataProcessor to set up all EDD datasets as
# UNstructured with a single actual coordinate
# (dataset_pont_index).
signals = [
{'name': 'labx', 'shape': [], 'dtype': 'float64',
'attrs': {'units': 'mm', 'local_name': 'labx',
'data_type': 'smb_par'}},
{'name': 'laby', 'shape': [], 'dtype': 'float64',
'attrs': {'units': 'mm', 'local_name': 'laby',
'data_type': 'smb_par'}},
{'name': 'labz', 'shape': [], 'dtype': 'float64',
'attrs': {'units': 'mm', 'local_name': 'labz',
'data_type': 'smb_par'}},
{'name': 'ometotal', 'shape': [], 'dtype': 'float64',
'attrs': {'units': 'degrees', 'local_name': 'ometotal',
'data_type': 'smb_par'}},
{'name': 'presample_intensity', 'shape': [], 'dtype': 'uint64',
'attrs': {'units': 'counts', 'local_name': 'a3ic1',
'data_type': 'scan_column'}},
{'name': 'postsample_intensity', 'shape': [], 'dtype': 'uint64',
'attrs': {'units': 'counts', 'local_name': 'diode',
'data_type': 'scan_column'}},
{'name': 'dwell_time_actual', 'shape': [], 'dtype': 'float64',
'attrs': {'units': 'seconds', 'local_name': 'sec',
'data_type': 'scan_column'}},
{'name': 'SCAN_N', 'shape': [], 'dtype': 'uint8',
'attrs': {'units': 'n/a', 'local_name': 'SCAN_N',
'data_type': 'smb_par'}},
{'name': 'rsgap_size', 'shape': [], 'dtype': 'float64',
'attrs': {'units': 'mm', 'local_name': 'rsgap_size',
'data_type': 'smb_par'}},
{'name': 'x_effective', 'shape': [], 'dtype': 'float64',
'attrs': {'units': 'mm', 'local_name': 'x_effective',
'data_type': 'smb_par'}},
{'name': 'z_effective', 'shape': [], 'dtype': 'float64',
'attrs': {'units': 'mm', 'local_name': 'z_effective',
'data_type': 'smb_par'}},
]
# Add each MCA channel to the list of signals
for d in self.detectors:
signals.append(
{'name': d.get_id(), 'attrs': d.attrs, 'dtype': 'uint64',
'shape': d.attrs.get('shape', (4096,))})
# Attributes to attach for use by edd.StrainAnalysisProcessor:
attrs = {'dataset_id': self.dataset_id,
'config_id': dataset_lines[0][2],
'scan_type': scan_type,
'unstructured_axes': ['labx', 'laby', 'labz', 'ometotal']}
# Append additional fly_* signals depending on the scan type
# of the dataset. Also find the number of points / scan.
if scan_type == 0:
scan_npts = 1
fly_axis_values = None
else:
self.logger.warning(
'Assuming scan parameters are identical for all scans.')
axes_labels = {1: 'fly_labx', 2: 'fly_laby', 3: 'fly_labz',
4: 'fly_ometotal'}
axes_units = {1: 'mm', 2: 'mm', 3: 'mm', 4: 'degrees'}
signals.append({
'name': axes_labels[dataset_lines[0][13]],
'attrs': {'units': axes_units[dataset_lines[0][13]],
'relative': True},
'shape': [], 'dtype': 'float64'
})
scan_npts = dataset_lines[0][16]
fly_axis_labels = [axes_labels[dataset_lines[0][13]]]
fly_axis_values = {fly_axis_labels[0]:
np.round(np.linspace(
dataset_lines[0][14], dataset_lines[0][15],
dataset_lines[0][16]), 3)}
scan_shape = (len(fly_axis_values[fly_axis_labels[0]]),)
if scan_type in (2, 3, 5):
signals.append({
'name': axes_labels[dataset_lines[0][17]],
'attrs': {'units': axes_units[dataset_lines[0][17]],
'relative': True},
'shape': [], 'dtype': 'float64'
})
scan_npts *= dataset_lines[0][20]
if scan_type == 5:
attrs['bin_axis'] = axes_labels[dataset_lines[0][21]]
fly_axis_labels.append(axes_labels[dataset_lines[0][17]])
fly_axis_values[fly_axis_labels[-1]] = np.round(
np.linspace(dataset_lines[0][18], dataset_lines[0][19],
dataset_lines[0][20]), 3)
scan_shape = (*scan_shape,
len(fly_axis_values[fly_axis_labels[-1]]))
attrs['fly_axis_labels'] = fly_axis_labels
attrs['unstructured_axes'].extend(fly_axis_labels)
# Set up the single unstructured dataset coordinate
dataset_npts = len(dataset_lines) * scan_npts
coords = [{'name': 'dataset_point_index',
'values': list(range(dataset_npts)),
'attrs': {'units': 'n/a'}}]
# Set up the list of data_points to fill out the known values
# of the physical "coordinates"
data_points = []
for i in range(dataset_npts):
l = dataset_lines[i // scan_npts]
data_point = {
'dataset_point_index': i,
'labx': l[3], 'laby': l[4], 'labz': l[5],
'ometotal': l[6] + l[7]}
if fly_axis_values:
scan_step_index = i % scan_npts
scan_steps = np.ndindex(scan_shape[::-1])
ii = 0
while ii <= scan_step_index:
scan_step = next(scan_steps)
ii += 1
scan_step_indices = scan_step[::-1]
for iii, (k, v) in enumerate(fly_axis_values.items()):
data_point[k] = v[scan_step_indices[iii]]
data_points.append(data_point)
return {'coords': coords, 'signals': signals,
'attrs': attrs, 'data_points': data_points}
[docs]
class SliceNXdataReader(Reader):
"""A reader class to load and slice a NeXus style
`NXdata <https://manual.nexusformat.org/classes/base_classes/NXdata.html#index-0>`__
field from a NeXus file. This class reads EDD data from a `NXdata`
object and slices all fields according to the provided slicing
parameters.
:ivar scan_number: SPEC scan number.
:vartype scan_number: int
"""
scan_number: conint(ge=0)
[docs]
def read(self):
"""Reads a NeXus style
`NXdata <https://manual.nexusformat.org/classes/base_classes/NXdata.html#index-0>`__
object from a NeXus file and slices the fields within it based
on the provided scan number.
:raises ValueError: If no `NXdata` object is found in the file.
:return: Root object of the NeXus file with sliced `NXdata`
fields.
:rtype: nexusformat.nexus.NXroot
"""
# Third party modules
from nexusformat.nexus import NXentry, NXfield
# Local modules
from CHAP.common import NexusReader
from CHAP.utils.general import nxcopy
reader = NexusReader(**self.model_dump())
nxroot = nxcopy(reader.read())
nxdata = None
for nxname, nxobject in nxroot.items():
if isinstance(nxobject, NXentry):
nxdata = nxobject.data
if nxdata is None:
msg = 'Could not find NXdata group'
self.logger.error(msg)
raise ValueError(msg)
indices = np.argwhere(
nxdata.SCAN_N.nxdata == self.scan_number).flatten()
for nxname, nxobject in nxdata.items():
if isinstance(nxobject, NXfield):
nxdata[nxname] = NXfield(
value=nxobject.nxdata[indices],
dtype=nxdata[nxname].dtype,
attrs=nxdata[nxname].attrs,
)
return nxroot
[docs]
class UpdateNXdataReader(Reader):
"""Companion to :class:`~CHAP.edd.reader.SetupNXdataReader` and
:class:`~CHAP.common.processor.UpdateNXDataProcessor`. Constructs
a list of data points to pass as pipeline data to
`UpdateNXDataProcessor` so that a NeXus style
`NXdata <https://manual.nexusformat.org/classes/base_classes/NXdata.html>`__
constructed by `SetupNXdataReader` and `UpdateNXDataProcessor` can
be updated live as individual scans in an EDD dataset are
completed.
:ivar detector_ids: Detector IDs.
:vartype detector_ids: int or list[int] or str, optional
:ivar scan_number: SPEC scan number.
:vartype scan_number: int
"""
detector_ids: Optional[
conlist(item_type=conint(gt=0), min_length=1)] = None
scan_number: conint(ge=0)
[docs]
def read(self):
"""Return a list of data points containing raw data values for
a single EDD spec scan. The returned values can be passed
along to :class:`~CHAP.common.processor.UpdateNXdataProcessor`
to fill in an existing NeXus Style
`NXdata <https://manual.nexusformat.org/classes/base_classes/NXdata.html>`__
object up with
:class:`~CHAP.common.processor.SetupNXdataProcessor`.
:returs: Data points appropriate for input to
`UpdateNXdataProcessor`.
:rtype: list[dict[str, Any]]
"""
# Local modules
from CHAP.utils.parfile import ParFile
scanparser = ScanParser(self.filename, self.scan_number)
self.logger.debug('Parsed scan')
# A label / counter mne dict for convenience
counters = {
'presample_intensity': 'a3ic0',
'postsample_intensity': 'diode',
'dwell_time_actual': 'sec',
}
# Determine the scan's own coordinate axes based on scan type
scan_type = scanparser.pars['scan_type']
self.logger.debug(f'scan_type = {scan_type}')
if scan_type == 0:
scan_axes = []
else:
axes_labels = {1: 'fly_labx', 2: 'fly_laby', 3: 'fly_labz',
4: 'fly_ometotal'}
scan_axes = [axes_labels[scanparser.pars['fly_axis0']]]
if scan_type in (2, 3, 5):
scan_axes.append(axes_labels[scanparser.pars['fly_axis1']])
self.logger.debug(f'Determined scan axes: {scan_axes}')
# Par file values will be the same for all points in any scan
smb_par_values = {}
for smb_par in ('labx', 'laby', 'labz', 'ometotal', 'SCAN_N',
'rsgap_size', 'x_effective', 'z_effective'):
smb_par_values[smb_par] = scanparser.pars[smb_par]
# Get offset for the starting index of this scan's points in
# the entire dataset.
dataset_id = scanparser.pars['dataset_id']
parfile = ParFile(scanparser.par_file)
good_scans = parfile.good_scan_numbers()
n_prior_dataset_scans = sum(
[1 if did == dataset_id and scan_n < self.scan_number else 0
for did, scan_n in zip(
parfile.get_values('dataset_id', scan_numbers=good_scans),
good_scans)])
dataset_point_index_offset = \
n_prior_dataset_scans * scanparser.spec_scan_npts
self.logger.debug(
f'dataset_point_index_offset = {dataset_point_index_offset}')
# Get full data point for every point in the scan
if self.detector_ids is None:
self.detector_ids = list(range(23))
detector_data = scanparser.get_detector_data(self.detector_ids)
detector_data = {id_: detector_data[:,i,:]
for i, id_ in enumerate(self.detector_ids)}
spec_scan_data = scanparser.spec_scan_data
self.logger.info(f'Getting {scanparser.spec_scan_npts} data points')
idx = slice(dataset_point_index_offset,
dataset_point_index_offset + scanparser.spec_scan_npts)
data_points = [
{'nxpath': f'entry/data/{k}',
'value': [v] * scanparser.spec_scan_npts,
'index': idx}
for k, v in smb_par_values.items()]
data_points.extend([
{'nxpath': f'entry/data/{id_}',
'value': data,
'index': idx}
for id_, data in detector_data.items()
])
data_points.extend([
{'nxpath': f'entry/data/{c}',
'value': spec_scan_data[counters[c]],
'index': idx}
for c in counters
])
return data_points
[docs]
class NXdataSliceReader(Reader):
"""Reader for returning a sliced verson of a NeXus style
`NXdata <https://manual.nexusformat.org/classes/base_classes/NXdata.html>`__
object (which represents a full EDD dataset) that contains data
from just a single SPEC scan.
:ivar nxpath: Path to the existing full EDD dataset's `NXdata`
object in `filename`.
:vartype nxpath: str
:ivar scan_number: SPEC scan number.
:vartype scan_number: int
:ivar spec_file: Name of the spec file whose data will be the only
contents of the returned `NXdata`.
:vartype spec_file: str
"""
nxpath: constr(strip_whitespace=True, min_length=1)
scan_number: conint(ge=0)
spec_file: constr(strip_whitespace=True, min_length=1)
[docs]
def read(self):
"""Return a "slice" of an EDD dataset's NeXus style
`NXdata <https://manual.nexusformat.org/classes/base_classes/NXdata.html>`__
object that represents just the data from one scan in the
dataset.
:returns: `NXdata` object similar to the one at `nxpath` in
`filename`, but containing only the data collected by the
specified spec scan.
:rtype: nexusformat.nexus.NXdata
"""
# Third party modules
from nexusformat.nexus import nxload
# Local modules
from CHAP.common import NXdataReader
from CHAP.utils.parfile import ParFile
# Parse existing NXdata
root = nxload(self.filename)
nxdata = root[self.nxpath]
if nxdata.nxclass != 'NXdata':
raise TypeError(
f'Object at {self.nxpath} in {self.filename} is not an NXdata')
self.logger.debug('Loaded existing NXdata')
# Parse scan
if not os.path.isabs(self.spec_file):
self.spec_file = os.path.join(self.inputdir, self.spec_file)
scanparser = ScanParser(self.spec_file, self.scan_number)
self.logger.debug('Parsed scan')
# Assemble arguments for NXdataReader
axes_names = [a.nxname for a in nxdata.nxaxes]
if nxdata.nxsignal is not None:
signal_name = nxdata.nxsignal.nxname
else:
signal_name = list(nxdata.entries.keys())[0]
attrs = nxdata.attrs
nxfield_params = []
if 'dataset_point_index' in nxdata:
# Get offset for the starting index of this scan's points in
# the entire dataset.
dataset_id = scanparser.pars['dataset_id']
parfile = ParFile(scanparser.par_file)
good_scans = parfile.good_scan_numbers()
n_prior_dataset_scans = sum(
[1 if did == dataset_id and scan_n < self.scan_number else 0
for did, scan_n in zip(
parfile.get_values(
'dataset_id', scan_numbers=good_scans),
good_scans)])
dataset_point_index_offset = \
n_prior_dataset_scans * scanparser.spec_scan_npts
self.logger.debug(
f'dataset_point_index_offset = {dataset_point_index_offset}')
# FIX convert to using CHAPslice
slice_params = {
'start': dataset_point_index_offset,
'end':
dataset_point_index_offset + scanparser.spec_scan_npts + 1,
}
nxfield_params = [{'filename': self.filename,
'nxpath': entry.nxpath,
'slice_params': [slice_params]}
for entry in nxdata]
else:
signal_slice_params = []
for a in nxdata.nxaxes:
if a.nxname.startswith('fly_'):
slice_params = {}
else:
value = scanparser.pars[a.nxname]
try:
index = np.where(a.nxdata == value)[0][0]
except Exception:
index = np.argmin(np.abs(a.nxdata - value))
self.logger.warning(
f'Nearest match for coordinate value {a.nxname}: '
f'{a.nxdata[index]} (actual value: {value})')
# FIX convert to using CHAPslice
slice_params = {'start': index, 'end': index+1}
signal_slice_params.append(slice_params)
nxfield_params.append({
'filename': self.filename,
'nxpath': os.path.join(nxdata.nxpath, a.nxname),
'slice_params': [slice_params],
})
for _, entry in nxdata.entries.items():
if entry in nxdata.nxaxes:
continue
nxfield_params.append({
'filename': self.filename,
'nxpath': entry.nxpath,
'slice_params': signal_slice_params,
})
# Return the "sliced" NXdata
reader = NXdataReader()
reader.logger = self.logger
return reader.read(name=nxdata.nxname, nxfield_params=nxfield_params,
signal_name=signal_name, axes_names=axes_names,
attrs=attrs)
if __name__ == '__main__':
# Local modules
from CHAP.reader import main
main()