Source code for CHAP.utils.parfile

#!/usr/bin/env python
#-*- coding: utf-8 -*-
"""Utilities for interacting with scans using an SMB-style par file
as input.
"""

# System modules
import csv
import os

# Third party modules
import json

[docs] class ParFile(): """Representation of a par file. :ivar column_names: List of the names of each column in the par file. :vartype column_names: list[str] :ivar data: A 2D array of the data in this par file: 0th index: row, 1st index: column :vartype data: list[list] :ivar json_file: Name of the json file containing the keys for the column names of the par file. :vartype json_file: str :ivar par_file: Name of the par file. :vartype par_file: str :ivar scann_i: Column index for the scan number column in the par file. :vartype scann_i: int :ivar scan_numbers: Scan numbers for which data is available in the par file. :vartype scan_numbers: list[int] :ivar spec_file: Name of the SPEC data file associated with this par file. :vartype spec_file: str """ def __init__(self, par_file, scan_numbers=None, scann_col_name='SCAN_N'): """Initialize ParFile. :param par_file: Name of the par file. :type par_file: str :param scan_numbers: List of scan numbers to use. :type scan_numbers: int or list[int] or str, optional :param scann_col_name: Column name for scan number in the par file, defaults to 'SCAN_N'. :type scann_col_name: str, optional """ # Local modules from CHAP.utils.general import ( is_int_series, string_to_list, ) self.par_file = str(par_file) self.json_file = self.par_file.replace('.par', '.json') self.spec_file = os.path.join( os.path.dirname(self.par_file), 'spec.log') with open(self.json_file, encoding='utf-8') as json_file: columns = json.load(json_file) num_column = len(columns) self.column_names = [None] * num_column for i, name in columns.items(): self.column_names[int(i)] = name self.data = [] with open(self.par_file, encoding='utf-8') as f: reader = csv.reader(f, delimiter=' ') for i, row in enumerate(reader): if len(row) == 0: continue if row[0].startswith('#'): continue row_data = [] for value in row: try: value = int(value) except ValueError: try: value = float(value) except ValueError: pass row_data.append(value) if len(row_data) != num_column: raise ValueError( 'Mismatch between the number of columns in the json ' f'({num_column}) and line {i+1} of the par file ' f'({len(row_data)})') self.data.append(row_data) self.scann_i = self.column_names.index(scann_col_name) self.scan_numbers = [data[self.scann_i] for data in self.data] if scan_numbers is not None: if isinstance(scan_numbers, int): scan_numbers = [scan_numbers] elif isinstance(scan_numbers, str): scan_numbers = string_to_list(scan_numbers) if not is_int_series(scan_numbers, ge=0, log=False): raise TypeError( f'Invalid scan_numbers parameter ({scan_numbers})') self.scan_numbers = [ n for n in scan_numbers if n in self.scan_numbers]
[docs] def get_map( self, experiment_type, station, par_dims, other_dims=None): """Return a map configuration based on this par file. :param experiment_type: Experiment type name for the map that this par file represents. :type experiment_type: Literal['EDD', 'GIWAXS', 'SAXSWAXS', 'TOMO', 'XRF'] :param station: Station name at which the data were collected. :type station: Literal['id1a3','id3a','id3b'] :param par_dims: List of dictionaries configuring the map's independent dimensions. :type par_dims: list[dict[str, str]] :param other_dims: Other dimensions to include in the returned :class:`~CHAP.common.models.map.MapConfig`'s `independednt_dimensions`. Use this if each scans in this par file captured more than one frame of data. :type other_dims: list[dict[str,str]], optional :return: Map configuration. :rtype: MapConfig """ # Third party modeuls # pylint: disable=import-error from chess_scanparsers import SMBScanParser # pylint: enable=import-error # Local modules from CHAP.common.models.map import MapConfig scanparser = SMBScanParser(self.spec_file, 1) good_scans = self.good_scan_numbers() if other_dims is None: other_dims = [] map_config = { 'title': scanparser.scan_name, 'station': station, #scanparser.station, 'experiment_type': experiment_type, 'sample': {'name': scanparser.scan_name}, 'spec_scans': [ {'spec_file': self.spec_file, 'scan_numbers': good_scans}], 'independent_dimensions': [ {'label': dim['label'], 'units': dim['units'], 'name': dim['name'], 'data_type': 'smb_par'} for dim in par_dims] + other_dims } return MapConfig(**map_config)
[docs] def good_scan_numbers(self, good_col_name='1/0'): """Return the numbers of scans marked with a "1" in the indicated "good" column of the par file. :param good_col_name: Name of the "good" column of the par file, defaults to "1/0" :type good_col_name: str, optional :raises ValueError: If this par file does not have a column with the same name as `good_col_name`. :return: "good" scan numbers. :rtype: list[int] """ good_col_i = self.column_names.index(good_col_name) return [self.scan_numbers[i] for i in range(len(self.scan_numbers)) if self.data[i][good_col_i] == 1]
[docs] def get_values(self, column, scan_numbers=None): """Return values from a single column of the par file. :param column: String name OR index of the column to return values for. :type column: str or int :param scan_numbers: Specific scan numbers to return values in the given column for (instead of the default behavior: return the entire column of values). :type scan_numbers: list[int], optional :raise: ValueError: Unavailable column name. TypeError: Illegal column name type. :return: Values from a single column in the par file. :rtype: list[object] """ if isinstance(column, str): if column in self.column_names: column_idx = self.column_names.index(column) # elif column in ('dataset_id', 'scan_type'): # column_idx = None else: raise ValueError(f'Unavailable column name: {column} not in ' f'{self.column_names}') elif isinstance(column, int): column_idx = column else: raise TypeError(f'Column must be a str or int, not {type(column)}') if column_idx is None: column_data = [None]*len(self.data) else: column_data = [ self.data[i][column_idx] for i in range(len(self.data))] if scan_numbers is not None: column_data = [column_data[self.scan_numbers.index(scan_n)] \ for scan_n in scan_numbers] return column_data
[docs] def map_values(self, map_config, values): """Return a reshaped array of the 1D list `values` so that it matches up with the coordinates of `map_config`. :param map_config: Map configuration according to which values will be reshaped. :type map_config: MapConfig :param values: 1D list of values to reshape. :type values: list or numpy.ndarray :return: Reshaped array of values. :rtype: numpy.ndarray """ # Third party modules import numpy as np good_scans = self.good_scan_numbers() if len(values) != len(good_scans): raise ValueError('Number of values provided ({len(values)}) does ' 'not match the number of good scans in ' f'{self.par_file} ({len(good_scans)})') n_map_points = np.prod(map_config.shape) if len(values) != n_map_points: raise ValueError( f'Cannot reshape {len(values)} values into an array of shape ' f'{map_config.shape}') map_values = np.empty(map_config.shape) for map_index in np.ndindex(map_config.shape): _, scan_number, _ = \ map_config.get_scan_step_index(map_index) value_index = good_scans.index(scan_number) map_values[map_index] = values[value_index] return map_values