Source code for CHAP.common.reader

#!/usr/bin/env python
#-*- coding: utf-8 -*-
"""Module for generic Readers used in multiple experiment-specific
workflows.
"""

# System modules
from typing import (
    Literal,
    Optional,
    Union,
)

# Third party modules
import numpy as np
from pydantic import (
    PrivateAttr,
    conint,
    conlist,
    constr,
    model_validator,
)

# Local modules
from CHAP.common.models.map import (
    DetectorConfig,
    SpecConfig,
)
from CHAP.reader import (
    Reader,
    validate_reader_model,
)

[docs] def validate_model(model): """Validate the `model` configuration. :return: Validated model. :rtype: Any """ if model.filename is not None: validate_reader_model(model) return model
[docs] class BinaryFileReader(Reader): """Reader for binary files."""
[docs] def read(self): """Return a content of a given binary file. :return: File content. :rtype: binary """ with open(self.filename, 'rb') as file: data = file.read() return data
[docs] class ConfigReader(Reader): """Reader for YAML files that optionally implements and verifies it agaist its `Pydantic <https://github.com/pydantic/pydantic>`__ configuration schema. """
[docs] def read(self): """Return an optionally verified dictionary from the contents of a yaml file. :return: File content. :rtype: dict """ data = YAMLReader(**self.model_dump()).read() #print(f'\nConfigReader.read start data {type(data)}:') raise RuntimeError( 'FIX ConfigReader downstream validators do not like a pydantic ' 'class as output of a reader, but returning data.model_dict() ' 'instead screws up default value identification') #pprint(data) if self.get_schema() is not None: data = self.get_config(config=data, schema=self.get_schema()) self.status = 'read' #print(f'\nConfigReader.read end data {type(data)}:') #pprint(data) return data
[docs] class DetectorDataReader(Reader): """Reader for detector data files. Glob filenames allowed. Mask application and background correction available. """
[docs] def read(self, filename, mask_file=None, mask_above=None, mask_below=None, mask_value=np.nan, data_scalar=None, background_file=None, background_scalar=None, ): """Reads detector data, applies masking, scaling, and background subtraction. :param filename: Path to the primary data file. :type filename: str :param mask_file: Path to the mask file (optional). :type mask_file: str, optional :param mask_above: Mask values above this threshold (optional). :type mask_above: float, optional :param mask_below: Mask values below this threshold (optional). :type mask_below: float, optional :param data_scalar: Scalar to multiply the data (optional). :type data_scalar: float, optional :param background_file: Path to the background file (optional). :type background_file: str, optional :param background_scalar: Scalar to multiply the background data (optional). :type background_scalar: float, optional :return: Processed detector data. :rtype: numpy.ndarray """ # System modules import os # Third party modules import glob # Handle glob filenames if not os.path.isfile(filename): filenames = sorted(glob.glob(filename)) if not filenames: raise ValueError( '{filename} is not a file or glob that matches any files') else: filenames = [filename] # Read the raw data files self.logger.info(f'Reading {len(filenames)} raw data files') raw_data = [self._get_data_from_file(f) for f in filenames] # Initialize mask arary self.logger.info('Initializing mask array') mask_array = np.zeros_like(raw_data[0], dtype=bool) if mask_file: mask_array |= self._get_data_from_file(mask_file) != 0 # Initialize background data self.logger.info('Initializing background data') if background_file: background_data = self._get_data_from_file(background_file) if background_scalar: background_data *= background_scalar else: background_data = None # Handle mask_value of NaN if isinstance(mask_value, str): if mask_value.lower() == 'nan': mask_value = np.nan # Scale data, apply mask, subtract background self.logger.info('Applying corrections to raw data') corrected_data = self._correct_data( np.array(raw_data), mask_array, mask_above, mask_below, mask_value, data_scalar, background_data) return corrected_data
def _get_data_from_file(self, filename): # Third party modules import fabio self.logger.debug(f'Reading {filename}') with fabio.open(filename) as datafile: data = datafile.data return data def _correct_data( self, raw_data, mask_array, mask_above, mask_below, mask_value, data_scalar, background_data): # Scale raw data corrected_data = raw_data if data_scalar: corrected_data *= data_scalar # Apply mask mask = mask_array != 0 if mask_above is not None: mask |= corrected_data > mask_above if mask_below is not None: mask |= corrected_data < mask_below corrected_data = np.where(mask, mask_value, raw_data) # Subtract background if background_data: corrected_data -= background_data return corrected_data
[docs] class FabioImageReader(Reader): """Reader for images using the python `fabio <https://fabio.readthedocs.io/en/main>`__ package. :ivar frame: Index of a specific frame to read from the file(s), defaults to `None`. :vartype frame: int, optional """ frame: Optional[conint(ge=0)] = None
[docs] def read(self): """Return the data from the image file(s) provided. :returns: Image data as a numpy array (or list of numpy arrays, if a glob pattern matching more than one file was provided). :rtype: numpy.ndarray | list[numpy.ndarray] """ # Third party modules from glob import glob import fabio filenames = glob(self.filename) data = [] for f in filenames: image = fabio.open(f, frame=self.frame) data.append(image.data) image.close() return data
[docs] class H5Reader(Reader): """Reader for h5 files. :ivar h5path: Path to a specific location in the h5 file to read data from, defaults to `'/'`. :vartype h5path: str, optional :ivar idx: Data slice to read from the object at the specified location in the h5 file. :vartype idx: list[int], optional """ h5path: Optional[constr(strip_whitespace=True, min_length=1)] = '/' idx: Optional[conlist(min_length=1, max_length=3, item_type=int)] = None
[docs] def read(self): """Return the data object stored at `h5path` in an h5-file. :return: Object indicated by `filename` and `h5path`. :rtype: Any """ # Third party modules from h5py import File data = File(self.filename, 'r')[self.h5path] if self.idx is not None: data = data[tuple(self.idx)] return data
[docs] class LinkamReader(Reader): """Reader for loading Linkam load frame .txt files as an `NXdata <https://manual.nexusformat.org/classes/base_classes/NXdata.html#index-0>`__ object. :ivar columns: Column names to read in, defaults to None (read in all columns) :vartype columns: list[str], optional """ columns: Optional[conlist( item_type=constr(strip_whitespace=True, min_length=1))] = None
[docs] def read(self): """Read specified columns from the given Linkam file. :returns: Linkam data. :rtype: nexusformat.nexus.NXdata """ # Third party modules from nexusformat.nexus import ( NXdata, NXfield, ) # Parse .txt file start_time, metadata, data = LinkamReader.parse_file( self.filename, self.logger) # Get list of actual data column names and corresponding # signal nxnames (same as user-supplied column names) signal_names = [] if self.columns is None: signal_names = [(col, col) for col in data.keys() if col != 'Time'] else: for col in self.columns: col_actual = col if col == 'Distance': col_actual = 'Force V Distance_X' elif col == 'Force': col_actual = 'Force V Distance_Y' elif not col in data: if f'{col}_Y' in data: # Always use the *_Y column if the user-supplied # column name has both _X and _Y components col_actual = f'{col}_Y' else: self.logger.warning( f'{col} not present in {self.filename}') continue signal_names.append((col_actual, col)) self.logger.info(f'Using (column name, signal name): {signal_names}') nxdata = NXdata( axes=(NXfield( name='Time', value=np.array(data['Time']) + start_time, dtype='float64', ),), **{col: NXfield( name=col, value=data[col_actual], dtype='float32', ) for col_actual, col in signal_names}, attrs=metadata ) return nxdata
[docs] @classmethod def parse_file(cls, filename, logger): """Return start time, metadata, and data stored in the provided Linkam .txt file. :returns: Start time, metadata, and data stored in the input file. :rtype: tuple(float, dict[str, str], dict[str, list[float]]) """ # System modules from datetime import datetime import os import re # Get t=0 from filename start_time = None basename = os.path.basename(filename) pattern = r'(\d{2}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}-\d{2})' match = re.search(pattern, basename) if match: datetime_str = match.group(1) dt = datetime.strptime(datetime_str, '%d-%m-%y_%H-%M-%S-%f') start_time = dt.timestamp() else: logger.warning(f'Datetime not found in {filename}') # Get data add metadata from file contents metadata = {} data = False with open(filename, 'r', encoding='utf-8') as inf: for line in inf: line = line.strip() if not line: continue if data: # If data dict has been initialized, remaining # lines are all data values values = line.replace(',', '').split('\t') for val, col in zip(values, list(data.keys())): try: val = float(val) except Exception as exc: logger.warning( f'Cannot convert {col} value to float: {val} ' f'({exc})') continue else: data[col].append(val) if ':' in line: # Metadata key: value pair kept on this line _metadata = line.split(':', 1) if len(_metadata) == 2: key, val = _metadata else: continue #key, val = _metadata[0], None metadata[key] = val if re.match(r'^([\w\s\w]+)(\t\t[\w\s\w]+)*$', line): # Match found for start of data section -- this # line and the next are column labels. data_cols = [] # Get base quantity column names base_cols = line.split('\t\t') # Get Index, X and Y component columns line = next(inf) comp_cols = line.split('\t') # Assemble actual column names data_cols.append('Index') comp_cols_count = int((len(comp_cols) - 1) / 2) for i in range(comp_cols_count): data_cols.extend( [f'{base_cols[i]}_{comp}' for comp in ('X', 'Y')] ) if len(base_cols) > comp_cols_count: data_cols.extend(base_cols[comp_cols_count - 1:]) # First column (after 0th) is actually Time data_cols[1] = 'Time' # Start of data lines data = {col: [] for col in data_cols} logger.info(f'Found data columns: {data_cols}') return start_time, metadata, data
[docs] class MapReader(Reader): """Reader for CHESS sample maps."""
[docs] def read( self, filename=None, map_config=None, detector_names=None): """Take a map configuration dictionary and return a representation of the map as a NeXus style `NXentry <https://manual.nexusformat.org/classes/base_classes/NXentry.html#index-0>`__ object. The NXentry's default data group will contain the raw data collected over the course of the map. :param filename: Name of a file with the map configuration to read and pass onto the constructor of :class:`~CHAP.common.models.map.MapConfig`. :type filename: str, optional :param map_config: Map configuration to be passed directly to the constructor of :class:`~CHAP.common.models.map.MapConfig`. :type map_config: dict, optional :param detector_names: Detector prefixes to include raw data for in the returned NXentry object. :type detector_names: list[str], optional :return: Data from the provided map configuration. :rtype: nexusformat.nexus.NXentry """ # System modules import os # Third party modules from nexusformat.nexus import ( NXcollection, NXdata, NXentry, NXfield, NXsample, ) # Local modules from CHAP.common.models.map import MapConfig raise RuntimeError('MapReader is obsolete, use MapProcessor') if filename is not None: if map_config is not None: raise RuntimeError('Specify either filename or map_config ' 'in common.MapReader, not both') # Read the map configuration from file if not os.path.isfile(filename): raise OSError(f'input file does not exist ({filename})') extension = os.path.splitext(filename)[1] if extension in ('.yml', '.yaml'): reader = YAMLReader() else: raise RuntimeError('input file has a non-implemented ' f'extension ({filename})') map_config = reader.read(filename) elif not isinstance(map_config, dict): raise RuntimeError('Invalid parameter map_config in ' f'common.MapReader ({map_config})') # Validate the map configuration provided by constructing a # MapConfig map_config = MapConfig(**map_config, inputdir=self.inputdir) # Set up NXentry and add misc. CHESS-specific metadata nxentry = NXentry(name=map_config.title) nxentry.attrs['station'] = map_config.station nxentry.map_config = map_config.model_dump_json() nxentry.spec_scans = NXcollection() for scans in map_config.spec_scans: nxentry.spec_scans[scans.scanparsers[0].scan_name] = \ NXfield(value=scans.scan_numbers, attrs={'spec_file': str(scans.spec_file)}) # Add sample metadata nxentry[map_config.sample.name] = NXsample( **map_config.sample.model_dump()) # Set up default data group nxentry.data = NXdata() if map_config.map_type == 'structured': nxentry.data.attrs['axes'] = map_config.dims for dim in map_config.independent_dimensions: nxentry.data[dim.label] = NXfield( value=map_config.coords[dim.label], units=dim.units, attrs={'long_name': f'{dim.label} ({dim.units})', 'data_type': dim.data_type, 'local_name': dim.name}) # Create empty NXfields for all scalar data present in the # provided map configuration signal = False auxilliary_signals = [] for data in map_config.all_scalar_data: nxentry.data[data.label] = NXfield( value=np.zeros(map_config.shape), units=data.units, attrs={'long_name': f'{data.label} ({data.units})', 'data_type': data.data_type, 'local_name': data.name}) if not signal: signal = data.label else: auxilliary_signals.append(data.label) if signal: nxentry.data.attrs['signal'] = signal nxentry.data.attrs['auxilliary_signals'] = auxilliary_signals # Create empty NXfields of appropriate shape for raw # detector data if detector_names is None: detector_names = [] for detector_name in detector_names: if not isinstance(detector_name, str): detector_name = str(detector_name) detector_data = map_config.get_detector_data( detector_name, (0,) * len(map_config.shape)) nxentry.data[detector_name] = NXfield(value=np.zeros( (*map_config.shape, *detector_data.shape)), dtype=detector_data.dtype) # Read and fill in maps of raw data if len(map_config.all_scalar_data) > 0 or detector_names: for map_index in np.ndindex(map_config.shape): for data in map_config.all_scalar_data: nxentry.data[data.label][map_index] = map_config.get_value( data, map_index) for detector_name in detector_names: if not isinstance(detector_name, str): detector_name = str(detector_name) nxentry.data[detector_name][map_index] = \ map_config.get_detector_data(detector_name, map_index) return nxentry
[docs] class PandasReader(Reader): """Reader for files that can be read in with `pandas <https://pandas.pydata.org/docs/index.html>`__ """
[docs] def read(self, filename, method='read_csv', comment='#', kwargs=None): """Return a `pandas.DataFrame` read from the given file. :param filename: Name of file to read from. :type filename: str :param method: Name of `pandas` method to use for reading from `filename`. Defaults to `'read_csv'`. :type method: str, optional :param comment: Character to identify comment lines in the input file, defaults to `'#'`. :type comment: str, optional :param kwargs: Additional keyword arguments to supply to the `pandas` reader. :param kwargs: dict, optional. :rtype: `pandas.DataFrame` """ # Third party modules # pylint: disable=import-error import pandas as pd reader = getattr(pd, method) if not callable(reader): raise ValueError( f'{method} is not a callable pandas reader method') if kwargs is None: kwargs = {} if not isinstance(kwargs, dict): raise TypeError( f'Invalid kwargs type ({type(kwargs)}, should be dict)') return reader(filename, comment=comment, **kwargs)
[docs] class NexusReader(Reader): """Reader for `NeXus <https://www.nexusformat.org>`__ files. :ivar nxpath: Path to a specific location in the NeXus file tree to read from, defaults to `'/'`. :vartype nxpath: str, optional :ivar idx: Index of array to select, defaults to `None` :vartype idx: int, optional :ivar mode: File mode, defaults to 'r'. :vartype mode: Literal['r', 'rw', 'r+', 'w', 'a'], optional :ivar nxmemory: Maximum memory usage when reading NeXus files. :vartype nxmemory: int, optional """ nxpath: Optional[constr(strip_whitespace=True, min_length=1)] = '/' idx: Optional[conint(ge=0)] = None mode: Literal['r', 'rw', 'r+', 'w', 'a'] = 'r' nxmemory: Optional[conint(gt=0)] = None
[docs] def read(self): """Return the NeXus Style `NXobject <https://manual.nexusformat.org/classes/base_classes/NXobject.html#index-0>`__, object stored at `nxpath` in a `NeXus <https://www.nexusformat.org>`__ file. :raises nexusformat.nexus.NeXusError: If `filename` is not a NeXus file or `nxpath` is not in its tree. :return: NeXus object indicated by `filename` and `nxpath`. :rtype: nexusformat.nexus.NXobject """ # Third party modules from nexusformat.nexus import ( nxload, nxsetconfig, ) if self.nxmemory is not None: nxsetconfig(memory=self.nxmemory) if self.idx is not None: return nxload(self.filename, mode=self.mode)[self.nxpath][self.idx] return nxload(self.filename, mode=self.mode)[self.nxpath]
[docs] class NXdataReader(Reader): """Reader for constructing a NeXus style `NXdata <https://manual.nexusformat.org/classes/base_classes/NXdata.html#index-0>`__ object from components."""
[docs] def read(self, name, nxfield_params, signal_name, axes_names, attrs=None): """Return a basic NeXus style `NXdata <https://manual.nexusformat.org/classes/base_classes/NXdata.html#index-0>`__ object constructed from components. :param name: NXdata group name. :type name: str :param nxfield_params: List of sets of parameters for :class:`~CHAP/common.reader.NXfieldReader` specifying the `NXfield <https://nexpy.github.io/nexpy/treeapi.html#nexusformat.nexus.tree.NXfield>`__ objects belonging to the NXdata object. :type nxfield_params: list[dict] :param signal_name: Name of the signal for the NXdata (musts be one of the names of the NXfields indicated in `nxfields`). :type signal: str :param axes_names: Name or names of the coordinate axes NXfields associated with the signal (must be names of NXfields indicated in `nxfields`). :type axes_names: str | list[str] :param attrs: Additional configuration attributes. :type attrs: dict, optional :returns: NXdata object. :rtype: nexusformat.nexus.NXdata """ # Third party modules from nexusformat.nexus import NXdata # Read in NXfields nxfields = [NXfieldReader().read(**params, inputdir=self.inputdir) for params in nxfield_params] nxfields = {nxfield.nxname: nxfield for nxfield in nxfields} # Get signal NXfield try: nxsignal = nxfields[signal_name] except Exception as exc: raise ValueError( '`signal_name` must be the name of one of the NXfields ' 'indicated in `nxfields`: , '.join(nxfields.keys())) from exc # Get axes NXfield(s) if isinstance(axes_names, str): axes_names = [axes_names] try: nxaxes = [nxfields[axis_name] for axis_name in axes_names] except Exception as exc: raise ValueError( '`axes_names` must contain only names of NXfields indicated ' 'in `nxfields`: ' + ', '.join(nxfields.keys())) from exc for i, nxaxis in enumerate(nxaxes): if len(nxaxis) != nxsignal.shape[i]: raise ValueError( f'Shape mismatch on signal dimension {i}: signal ' + f'"{nxsignal.nxname}" has {nxsignal.shape[i]} values, ' + f'but axis "{nxaxis.nxname}" has {len(nxaxis)} values.') if attrs is None: attrs = {} result = NXdata(signal=nxsignal, axes=nxaxes, name=name, attrs=attrs, **nxfields) self.logger.info(result.tree) return result
[docs] class NXfieldReader(Reader): """Reader for a NeXus style `NXfield <https://nexpy.github.io/nexpy/treeapi.html#nexusformat.nexus.tree.NXfield>`__ with options to modify certain attributes. """
[docs] def read(self, nxpath, nxname=None, update_attrs=None, slice_params=None): """Return a copy of the indicated NeXus style `NXfield <https://nexpy.github.io/nexpy/treeapi.html#nexusformat.nexus.tree.NXfield>`__ object from the file. Name and attributes of the returned copy may be modified with the `nxname` and `update_attrs` keyword arguments. :param nxpath: Path in `nxfile` pointing to the NXfield to read. :type nxpath: str :param nxname: New name for the returned NXfield. :type nxname: str, optional :param update_attrs: Optional dictonary used to add to / update the original NXfield's attributes. :type update_attrs: dict, optional :param slice_params: Parameters for returning just a slice of the full field data. Slice parameters are provided in a list dictionaries with integer values for any / all of the following keys: `"start"`, `"end"`, `"step"`. Default values used are: `"start"` - `0`, `"end"` -- `None`, `"step"` -- `1`. The order of the list must correspond to the order of the field's axes. :type slice_params: list[dict[str, int]], optional :returns: Copy of the indicated NXfield (with name and attributes optionally modified). :rtype: nexusformat.nexus.NXfield """ # Third party modules from nexusformat.nexus import ( NXfield, nxload, ) nxroot = nxload(self.filename) nxfield = nxroot[nxpath] if nxname is None: nxname = nxfield.nxname attrs = nxfield.attrs if update_attrs is not None: attrs.update(update_attrs) if slice_params is None: value = nxfield.nxdata else: if len(slice_params) < nxfield.ndim: slice_params.extend([{}] * (nxfield.ndim - len(slice_params))) if len(slice_params) > nxfield.ndim: slice_params = slice_params[0:nxfield.ndim] slices = () # FIX convert to using CHAPSlice default_slice = {'start': 0, 'end': None, 'step': 1} for s in slice_params: for k, v in default_slice.items(): if k not in s: s[k] = v slices = (*slices, slice(s['start'], s['end'], s['step'])) value = nxfield.nxdata[slices] nxfield = NXfield(value=value, name=nxname, attrs=attrs) self.logger.debug(f'Result -- nxfield.tree =\n{nxfield.tree}') return nxfield
[docs] class SpecReader(Reader): """Reader for CHESS SPEC scans. :ivar config: SPEC configuration to be passed directly to the constructor of :class:`~CHAP.common.models.map.SpecConfig`. :vartype config: dict, optional :ivar detectors: Detector configurations of the detectors to include raw data for in the returned NeXus `NXroot <https://manual.nexusformat.org/classes/base_classes/NXroot.html#index-0>`__ object, defaults to None (only a valid input for EDD). :vartype detectors: dict | DetectorConfig, optional :ivar filename: Name of file to read from. :vartype filename: str, optional """ config: Optional[Union[dict, SpecConfig]] = None detector_config: Optional[DetectorConfig] = None filename: Optional[str] = None _mapping_filename: PrivateAttr(default=None) _validate_filename = model_validator(mode='after')(validate_model)
[docs] @model_validator(mode='after') def validate_specreader_after(self): """Validate the `SpecReader` configuration. :return: Validated configuration. :rtype: SpecReader """ if self.filename is not None: if self.config is not None: raise ValueError('Specify either filename or config in ' 'common.SpecReader, not both') self.config = YAMLReader(**self.model_dump()).read() self.config = self.get_config( config=self.config, schema='common.models.map.SpecConfig') if self.detector_config is None: if self.config.experiment_type != 'EDD': raise RuntimeError( 'Missing parameter detector_config for experiment type ' f'{self.config.experiment_type}') return self
[docs] def read(self): """Take a SPEC configuration filename or dictionary and return the raw data as a NeXus style `NXroot <https://manual.nexusformat.org/classes/base_classes/NXroot.html#index-0>`__ object. :return: Data from the provided SPEC configuration. :rtype: nexusformat.nexus.NXroot """ # Third party modules # pylint: disable=no-name-in-module from json import dumps from nexusformat.nexus import ( NXcollection, NXdata, NXentry, NXfield, NXroot, ) # pylint: enable=no-name-in-module # Local modules from CHAP.common.models.map import Detector # Create the NXroot object nxroot = NXroot() nxentry = NXentry(name=self.config.experiment_type) nxroot[nxentry.nxname] = nxentry # Set up NXentry and add misc. CHESS-specific metadata as well # as all spec_motors, scan_columns, and smb_pars, and the # detector info and raw detector data nxentry.config = self.config.model_dump_json() nxentry.attrs['station'] = self.config.station nxentry.spec_scans = NXcollection() # nxpaths = [] if self.config.experiment_type == 'EDD': detector_data_format = None for scans in self.config.spec_scans: nxscans = NXcollection() nxentry.spec_scans[f'{scans.scanparsers[0].scan_name}'] = nxscans nxscans.attrs['spec_file'] = str(scans.spec_file) nxscans.attrs['scan_numbers'] = scans.scan_numbers for scan_number in scans.scan_numbers: scanparser = scans.get_scanparser(scan_number) if self.config.experiment_type == 'EDD': if detector_data_format is None: detector_data_format = scanparser.detector_data_format elif (scanparser.detector_data_format != detector_data_format): raise NotImplementedError( 'Mixing `spec` and `h5` data formats') if self.detector_config is None: detectors_ids = None elif detector_data_format == 'spec': raise NotImplementedError( 'detector_data_format = "spec"') else: detectors_ids = [ int(d.get_id()) for d in self.detector_config.detectors] nxscans[scan_number] = NXcollection() try: nxscans[scan_number].spec_motors = dumps( {k:float(v) for k,v in scanparser.spec_positioner_values.items()}) except Exception: pass try: nxscans[scan_number].scan_columns = dumps( {k:list(v) for k,v in scanparser.spec_scan_data.items() if len(v)}) except Exception: pass try: nxscans[scan_number].smb_pars = dumps( dict(scanparser.pars.items())) except Exception: pass try: nxscans[scan_number].spec_scan_motor_mnes = dumps( scanparser.spec_scan_motor_mnes) except Exception: pass if self.config.experiment_type == 'EDD': nxdata = NXdata() nxscans[scan_number].data = nxdata # nxpaths.append( # f'spec_scans/{nxscans.nxname}/{scan_number}/data') nxdata.data = NXfield( value=scanparser.get_detector_data(detectors_ids)[0]) else: if self.config.experiment_type == 'TOMO': dtype = np.float32 else: dtype = None nxdata = NXdata() nxscans[scan_number].data = nxdata # nxpaths.append( # f'spec_scans/{nxscans.nxname}/{scan_number}/data') for detector in self.detector_config.detectors: if self.detector_config.roi is None: detector_roi = None else: detector_roi=[ self.detector_config.roi[0].toslice(), self.detector_config.roi[1].toslice()] print(f'\n\ndetector_roi: {detector_roi}\n\n') nxdata[detector.get_id()] = NXfield( value=scanparser.get_detector_data( detector.get_id(), detector_roi=detector_roi, dtype=dtype)) if (self.config.experiment_type == 'EDD' and self.detector_config is None): if detector_data_format == 'spec': raise NotImplementedError('detector_data_format = "spec"') self.detector_config = DetectorConfig( detectors=[ Detector(id=i) for i in range(nxdata.data.shape[1])]) nxentry.detectors = self.detector_config.model_dump_json() #return nxroot, nxpaths return nxroot
[docs] class URLReader(Reader): """Reader for data available over HTTPS."""
[docs] def read(self, url, headers=None, timeout=10): """Make an HTTPS request to the provided URL and return the results. Headers for the request are optional. :param url: URL to read. :type url: str :param headers: Headers to attach to the request. :type headers: dict, optional :param timeout: Timeout for the HTTPS request, defaults to `10`. :type timeout: int :return: Content of the response. :rtype: Any """ # System modules import requests if headers is None: headers = {} resp = requests.get(url, headers=headers, timeout=timeout) data = resp.content self.logger.debug(f'Response content: {data}') return data
[docs] class YAMLReader(Reader): """Reader for YAML files."""
[docs] def read(self): """Return a dictionary from the contents of a yaml file. :return: Contents of the file. :rtype: dict """ # Third party modules import yaml with open(self.filename, encoding='utf-8') as f: data = yaml.safe_load(f) return data
[docs] class ZarrReader(Reader): """Reader for `Zarr <https://zarr.readthedocs.io/en/stable/>`__ stores. """
[docs] def read(self, filename, path='/', idx=None, mode='r'): """Return the `Zarr <https://zarr.readthedocs.io/en/stable/>`__ object stored at `path` in a Zarr store. :param filename: Path or URL to the Zarr store. :type filename: str :param path: Path to a specific location in the Zarr hierarchy to read from, defaults to `'/'`. :type path: str, optional :param idx: Optional index or slice to apply to the returned object. :type idx: int, slice, tuple, optional :param mode: Store access mode, defaults to `'r'`. Common values: `'r'`, `'r+'`, `'a'`, `'w'`. :type mode: str, optional :raises KeyError: If `path` does not exist in the store. :return: Zarr array or group indicated by `filename` and `path`. :rtype: zarr.core.Array or zarr.hierarchy.Group """ # Third party modules # pylint: disable=import-error import zarr # Open the Zarr store (directory, zip, or URL) root = zarr.open(filename, mode=mode) # Normalize path handling if path in ('/', ''): data = root else: # Remove leading slash for Zarr traversal data = root[path.lstrip('/')] # Optional indexing (arrays only) if idx is not None: data = data[idx] return data
if __name__ == '__main__': # Local modules from CHAP.reader import main main()