CHAP package
Base PipelineItems used in running a ChessAnalysisPipeline (CHAP).
CHAP provides infrastructure to construct and run X-ray data processing and analysis workflows using a set of modular components. We call these components PipelineItems (subclassed into Readers, Processors, and Writers). A pipeline uses a sequence of PipelineItems to execute a data processing workflow where the data returned by one PipelineItem becomes input for the following ones.
Many PipelineItems can be shared by data processing workflows for multiple different X-ray techniques, while others may be unique to just a single technique. The PipelineItems that are shared by many techniques are organized in the CHAP.common subpackage. PipelineItems unique to a tomography workflow, for instance, are organized in the CHAP.tomo subpackage.
utils contains a broad selection of utilities to assist in
some common tasks that appear in specific `PipelineItem
implementations.
Submodules summary
- models
Common Pydantic model classes.
- pipeline
Base pipeline Pydantic model classes.
- processor
Module defining the base Processor class to derive all others from.
- reader
Module defining the base Reader class to derive all others from.
- runner
Main functions to execute a ChessAnalysisPipeline (CHAP).
- server
Python server with thread pool and CHAP pipeline.
- taskmanager
Python thread pool.
- writer
Module defining the base Writer class to derive all others from.
Subpackages
- CHAP.common package
- Submodules summary
- Subpackages
- Submodules
- CHAP.common.map_utils module
- CHAP.common.nexus_utils module
- CHAP.common.processor module
AsyncProcessorBinarizeProcessorConstructBaselineConvertStructuredProcessorExpressionProcessorImageProcessorMPICollectProcessorMPIMapProcessorMPISpawnMapProcessorMapProcessorNXdataToDataPointsProcessorNexusToNumpyProcessorNexusToXarrayProcessorNexusToZarrProcessorNormalizeMapProcessorNormalizeNexusProcessorNumpyStackProcessorNumpySumProcessorNumpyToNXfieldProcessorPandasToXarrayProcessorPrintProcessorPyfaiAzimuthalIntegrationProcessorRawDetectorDataMapProcessorSetupNXdataProcessorUnstructuredToStructuredProcessorUpdateNXdataProcessorUpdateNXvalueProcessorXarrayToNexusProcessorXarrayToNumpyProcessorZarrToNexusProcessor
- CHAP.common.reader module
- CHAP.common.utils module
- CHAP.common.writer module
ExtractArchiveWriterFileTreeWriterH5WriterImageWriterMatplotlibAnimationWriterMatplotlibFigureWriterNexusValuesWriterNexusWriterPyfaiResultsWriterTXTWriterYAMLWriterZarrValuesWriterZarrWritervalidate_model()write_filetree()write_matplotlibfigure()write_nexus()write_tif()write_txt()write_yaml()
- CHAP.edd package
- Submodules summary
- Submodules
- CHAP.edd.models module
BaselineConfigDiffractionVolumeLengthConfigDiffractionVolumeLengthConfig.max_energy_kevDiffractionVolumeLengthConfig.measurement_modeDiffractionVolumeLengthConfig.model_configDiffractionVolumeLengthConfig.model_post_init()DiffractionVolumeLengthConfig.sample_thicknessDiffractionVolumeLengthConfig.sigma_to_dvl_factorDiffractionVolumeLengthConfig.validate_diffractionvolumelengthconfig_after()
FitConfigFitConfig.backgroundFitConfig.backgroundpeaksFitConfig.baselineFitConfig.centers_rangeFitConfig.energy_mask_rangesFitConfig.fwhm_maxFitConfig.fwhm_minFitConfig.mask_rangesFitConfig.model_configFitConfig.validate_background()FitConfig.validate_baseline()FitConfig.validate_energy_mask_ranges()FitConfig.validate_mask_ranges()
MCACalibrationConfigMCACalibrationConfig.flux_correction_interpolation_function()MCACalibrationConfig.flux_fileMCACalibrationConfig.flux_file_energy_range()MCACalibrationConfig.materialsMCACalibrationConfig.model_configMCACalibrationConfig.peak_energiesMCACalibrationConfig.scan_step_indicesMCACalibrationConfig.validate_mcacalibrationconfig_before()MCACalibrationConfig.validate_scan_step_indices()
MCADetectorCalibrationMCADetectorCalibration.convert_mask_ranges()MCADetectorCalibration.energiesMCADetectorCalibration.energy_calibration_coeffsMCADetectorCalibration.get_mask_ranges()MCADetectorCalibration.hkl_indicesMCADetectorCalibration.mca_mask()MCADetectorCalibration.model_configMCADetectorCalibration.model_post_init()MCADetectorCalibration.num_binsMCADetectorCalibration.processor_typeMCADetectorCalibration.set_energy_calibration_mask_ranges()MCADetectorCalibration.tth_calibratedMCADetectorCalibration.tth_initial_guessMCADetectorCalibration.tth_maxMCADetectorCalibration.tth_tol
MCADetectorConfigMCADetectorDiffractionVolumeLengthMCADetectorDiffractionVolumeLength.dvlMCADetectorDiffractionVolumeLength.fit_amplitudeMCADetectorDiffractionVolumeLength.fit_centerMCADetectorDiffractionVolumeLength.fit_sigmaMCADetectorDiffractionVolumeLength.model_configMCADetectorDiffractionVolumeLength.model_post_init()MCADetectorDiffractionVolumeLength.processor_type
MCADetectorStrainAnalysisMCADetectorStrainAnalysis.add_calibration()MCADetectorStrainAnalysis.centers_rangeMCADetectorStrainAnalysis.fwhm_maxMCADetectorStrainAnalysis.fwhm_minMCADetectorStrainAnalysis.get_calibration_mask_ranges()MCADetectorStrainAnalysis.get_tth_map()MCADetectorStrainAnalysis.model_configMCADetectorStrainAnalysis.model_post_init()MCADetectorStrainAnalysis.peak_modelsMCADetectorStrainAnalysis.processor_typeMCADetectorStrainAnalysis.rel_height_cutoffMCADetectorStrainAnalysis.tth_mapMCADetectorStrainAnalysis.validate_peak_models()
MCAEnergyCalibrationConfigMCAEnergyCalibrationConfig.max_energy_kevMCAEnergyCalibrationConfig.max_peak_indexMCAEnergyCalibrationConfig.model_configMCAEnergyCalibrationConfig.validate_max_peak_index()MCAEnergyCalibrationConfig.validate_mcaenergycalibrationconfig_after()MCAEnergyCalibrationConfig.validate_mcaenergycalibrationconfig_before()
MCATthCalibrationConfigMaterialConfigStrainAnalysisConfig
- CHAP.edd.processor module
DiffractionVolumeLengthProcessorDiffractionVolumeLengthProcessor.configDiffractionVolumeLengthProcessor.detector_configDiffractionVolumeLengthProcessor.model_configDiffractionVolumeLengthProcessor.model_post_init()DiffractionVolumeLengthProcessor.pipeline_fieldsDiffractionVolumeLengthProcessor.process()DiffractionVolumeLengthProcessor.validate_diffractionvolumelengthprocessor_after()DiffractionVolumeLengthProcessor.validate_diffractionvolumelengthprocessor_before()
HKLProcessorLatticeParameterRefinementProcessorLatticeParameterRefinementProcessor.configLatticeParameterRefinementProcessor.detector_configLatticeParameterRefinementProcessor.model_configLatticeParameterRefinementProcessor.model_post_init()LatticeParameterRefinementProcessor.pipeline_fieldsLatticeParameterRefinementProcessor.process()LatticeParameterRefinementProcessor.validate_latticeparameterrefinementprocessor_before()
MCAEnergyCalibrationProcessorMCAEnergyCalibrationProcessor.configMCAEnergyCalibrationProcessor.detector_configMCAEnergyCalibrationProcessor.model_configMCAEnergyCalibrationProcessor.model_post_init()MCAEnergyCalibrationProcessor.pipeline_fieldsMCAEnergyCalibrationProcessor.process()MCAEnergyCalibrationProcessor.validate_mcaenergycalibrationprocessor_before()
MCATthCalibrationProcessorMCATthCalibrationProcessor.configMCATthCalibrationProcessor.detector_configMCATthCalibrationProcessor.model_configMCATthCalibrationProcessor.model_post_init()MCATthCalibrationProcessor.pipeline_fieldsMCATthCalibrationProcessor.process()MCATthCalibrationProcessor.validate_mcatthcalibrationprocessor_before()
ReducedDataProcessorStrainAnalysisProcessorStrainAnalysisProcessor.add_points()StrainAnalysisProcessor.configStrainAnalysisProcessor.detector_configStrainAnalysisProcessor.model_configStrainAnalysisProcessor.model_post_init()StrainAnalysisProcessor.pipeline_fieldsStrainAnalysisProcessor.process()StrainAnalysisProcessor.setupStrainAnalysisProcessor.updateStrainAnalysisProcessor.validate_strainanalysisprocessor_before()
- CHAP.edd.reader module
- CHAP.edd.select_material_params_gui module
- CHAP.edd.utils module
- CHAP.edd.writer module
- CHAP.foxden package
- Submodules summary
- Submodules
- CHAP.foxden.models module
- CHAP.foxden.processor module
- CHAP.foxden.reader module
- CHAP.foxden.utils module
- CHAP.foxden.writer module
- CHAP.giwaxs package
- Submodules summary
- Submodules
- CHAP.giwaxs.models module
AzimuthalIntegratorConfigFiberIntegratorConfigGiwaxsConversionConfigIntegrate1dConfigIntegrate2dConfigIntegrate2dGIConfigIntegrate2dGIConfig.aisIntegrate2dGIConfig.attrsIntegrate2dGIConfig.methodIntegrate2dGIConfig.model_configIntegrate2dGIConfig.npt_ipIntegrate2dGIConfig.npt_oopIntegrate2dGIConfig.sample_orientationIntegrate2dGIConfig.unit_ipIntegrate2dGIConfig.unit_oopIntegrate2dGIConfig.validate_sample_orientation()Integrate2dGIConfig.validate_unit_ip()Integrate2dGIConfig.validate_unit_oop()
IntegratorConfigMultiGeometryConfigPyfaiIntegrationConfigPyfaiIntegratorConfigPyfaiIntegratorConfig.integrate()PyfaiIntegratorConfig.integration_methodPyfaiIntegratorConfig.integration_paramsPyfaiIntegratorConfig.model_configPyfaiIntegratorConfig.multi_geometryPyfaiIntegratorConfig.namePyfaiIntegratorConfig.right_handedPyfaiIntegratorConfig.validate_pyfaiintegratorconfig_after()PyfaiIntegratorConfig.validate_pyfaiintegratorconfig_before()
- CHAP.giwaxs.processor module
GiwaxsConversionProcessorGiwaxsConversionProcessor.configGiwaxsConversionProcessor.model_configGiwaxsConversionProcessor.model_post_init()GiwaxsConversionProcessor.nxmemoryGiwaxsConversionProcessor.nxpathGiwaxsConversionProcessor.pipeline_fieldsGiwaxsConversionProcessor.process()GiwaxsConversionProcessor.save_figures
PyfaiIntegrationProcessor
- CHAP.giwaxs.reader module
- CHAP.giwaxs.writer module
- CHAP.saxswaxs package
- Submodules summary
- Submodules
- CHAP.saxswaxs.processor module
CfProcessorFluxAbsorptionBackgroundCorrectionProcessorFluxAbsorptionCorrectionProcessorFluxCorrectionProcessorPyfaiIntegrationProcessorSetupProcessorSetupProcessor.dataset_chunksSetupProcessor.dataset_shapeSetupProcessor.detectorsSetupProcessor.map_configSetupProcessor.model_configSetupProcessor.model_post_init()SetupProcessor.num_chunkSetupProcessor.pipeline_fieldsSetupProcessor.process()SetupProcessor.pyfai_configSetupProcessor.raw_data
SetupResultsProcessorUnstructuredToStructuredProcessorUnstructuredToStructuredProcessor.get_common_axes()UnstructuredToStructuredProcessor.model_configUnstructuredToStructuredProcessor.model_post_init()UnstructuredToStructuredProcessor.process()UnstructuredToStructuredProcessor.structure_signal_values()UnstructuredToStructuredProcessor.validate_config_fields()UnstructuredToStructuredProcessor.validate_data()
UpdateValuesProcessorUpdateValuesProcessor.detectorsUpdateValuesProcessor.map_configUpdateValuesProcessor.model_configUpdateValuesProcessor.model_post_init()UpdateValuesProcessor.pipeline_fieldsUpdateValuesProcessor.process()UpdateValuesProcessor.pyfai_configUpdateValuesProcessor.raw_dataUpdateValuesProcessor.scan_numberUpdateValuesProcessor.spec_file
- CHAP.saxswaxs.reader module
- CHAP.saxswaxs.writer module
- CHAP.tomo package
- Submodules summary
- Submodules
- CHAP.tomo.models module
DetectorTomoCombineConfigTomoFindCenterConfigTomoFindCenterConfig.center_offset_maxTomoFindCenterConfig.center_offset_minTomoFindCenterConfig.center_offsetsTomoFindCenterConfig.center_rowsTomoFindCenterConfig.center_search_rangeTomoFindCenterConfig.center_stack_indexTomoFindCenterConfig.gaussian_sigmaTomoFindCenterConfig.model_configTomoFindCenterConfig.ring_width
TomoReconstructConfigTomoReduceConfigTomoSimConfig
- CHAP.tomo.processor module
NUM_CORE_TOMOPY_LIMITSetNumexprThreadsTomoBrightFieldProcessorTomoCHESSMapConverterTomoCombineProcessorTomoDarkFieldProcessorTomoFindCenterGuiTomoFindCenterGui.center_offsetsTomoFindCenterGui.center_rowsTomoFindCenterGui.center_stack_indexTomoFindCenterGui.gaussian_sigmaTomoFindCenterGui.img_column_boundsTomoFindCenterGui.img_row_boundsTomoFindCenterGui.model_configTomoFindCenterGui.model_post_init()TomoFindCenterGui.num_center_rowsTomoFindCenterGui.num_procTomoFindCenterGui.recon_planesTomoFindCenterGui.reconstruct_planes()TomoFindCenterGui.ring_widthTomoFindCenterGui.tbfTomoFindCenterGui.thetasTomoFindCenterGui.tk_rootTomoFindCenterGui.tomo_stacks
TomoFindCenterProcessorTomoMetadataProcessorTomoReconstructProcessorTomoReconstructProcessor.center_configTomoReconstructProcessor.configTomoReconstructProcessor.model_configTomoReconstructProcessor.model_post_init()TomoReconstructProcessor.num_procTomoReconstructProcessor.nxmemoryTomoReconstructProcessor.pipeline_fieldsTomoReconstructProcessor.process()TomoReconstructProcessor.save_figures
TomoReduceProcessorTomoSimFieldProcessorTomoSpecProcessorcreate_metadata_provenance()read_metadata_provenance()
- CHAP.tomo.reader module
- CHAP.tomo.writer module
- CHAP.utils package
- Submodules summary
- Submodules
- CHAP.utils.converters module
- CHAP.utils.fit module
ComponentComponentsFitFit.add_model()Fit.add_parameter()Fit.best_errorsFit.best_fitFit.best_parametersFit.best_valuesFit.best_varyFit.chisqrFit.componentsFit.covarFit.eval()Fit.fit()Fit.guess_init_peak()Fit.init_parametersFit.init_valuesFit.normalization_offsetFit.num_func_evalFit.parametersFit.plot()Fit.print_fit_report()Fit.redchiFit.residualFit.successFit.var_namesFit.xFit.y
FitMapFitMap.best_errorsFitMap.best_fitFitMap.best_parameters()FitMap.best_valuesFitMap.best_varyFitMap.chisqrFitMap.componentsFitMap.covarFitMap.fit()FitMap.freemem()FitMap.max_nfevFitMap.num_func_evalFitMap.out_of_boundsFitMap.plot()FitMap.redchiFitMap.residualFitMap.successFitMap.var_namesFitMap.yFitMap.ymap
FitProcessorModelResultParameters
- CHAP.utils.general module
all_any()almost_equal()assert_no_duplicate_attr_in_list_of_objs()assert_no_duplicate_key_in_list_of_dicts()assert_no_duplicates_in_list_of_dicts()baseline_arPLS()depth_list()depth_tuple()dictionary_update()fig_to_iobuf()file_exists_and_readable()get_consecutive_int_range()get_default_path()get_trailing_int()getfloat_attr()gformat()illegal_combination()illegal_value()index_nearest()index_nearest_down()index_nearest_up()input_int()input_int_list()input_menu()input_num()input_num_list()input_yesno()is_dict_nums()is_dict_series()is_dict_strings()is_index()is_index_range()is_int()is_int_pair()is_int_series()is_num()is_num_pair()is_num_series()is_str_or_str_series()is_str_series()list_dictionary_update()list_to_string()not_zero()nxcopy()quick_imshow()quick_plot()range_string_ge_gt_le_lt()rolling_average()round_to_n()round_up_to_n()save_iobuf_fig()select_image_indices()select_mask_1d()select_roi_1d()select_roi_2d()string_to_list()test_ge_gt_le_lt()trunc_to_n()unwrap_tuple()
- CHAP.utils.material module
- CHAP.utils.models module
ConstantExponentialExpressionFitConfigFitParameterFitParameter.defaultFitParameter.exprFitParameter.init_valueFitParameter.maxFitParameter.minFitParameter.model_configFitParameter.model_post_init()FitParameter.nameFitParameter.prefixFitParameter.set()FitParameter.stderrFitParameter.validate_max()FitParameter.validate_min()FitParameter.valueFitParameter.vary
GaussianLinearLorentzianMultipeakPseudoVoigtQuadraticRectangleconstant()exponential()gaussian()linear()lorentzian()pvoigt()quadratic()rectangle()validate_parameters()
- CHAP.utils.parfile module
Submodules
CHAP.models module
Common Pydantic model classes.
- class CHAPBaseModel[source]
Bases:
BaseModelBase CHAP configuration class implementing robust serialization tools.
- dict(*args, **kwargs)[source]
Dump the class implemention to a dictionary.
- Parameters:
**kwargs – Arbitrary keyword arguments.
exclude (dict or set, optional) – Class variable(s) to omit from the output dictionary.
by_alias (bool, optional) – Use aliases as the output dictionary keys for class variables that have an alias., defaults to True.
- Type:
dict
- Returns:
Class implementation.
- Return type:
dict
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_dump(*args, **kwargs)[source]
Dump the class implemention to a dictionary.
- Parameters:
**kwargs – Arbitrary keyword arguments.
exclude (dict or set, optional) – Class variable(s) to omit from the output dictionary.
by_alias (bool, optional) – Use aliases as the output dictionary keys for class variables that have an alias., defaults to True.
- Type:
dict
- Returns:
Class implementation.
- Return type:
dict
- model_dump_json(*args, **kwargs)[source]
Dump the class implemention to a JSON string.
- Parameters:
**kwargs – Arbitrary keyword arguments.
exclude (dict or set, optional) – Class variable(s) to omit from the output dictionary.
by_alias (bool, optional) – Use aliases as the output dictionary keys for class variables that have an alias., defaults to True.
- Type:
dict
- Returns:
Class implementation.
- Return type:
str
- class RunConfig(*, root: Annotated[Path, PathType(path_type=dir)] | None = '/home/runner/work/ChessAnalysisPipeline/ChessAnalysisPipeline/docs', inputdir: Annotated[Path, PathType(path_type=dir)] | None = None, outputdir: Annotated[Path, PathType(path_type=dir)] | None = None, interactive: bool | None = False, log_level: Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] | None = 'INFO')[source]
Bases:
CHAPBaseModelPipeline run configuration class.
- Variables:
root (str, optional) – Default work directory, defaults to the current run directory.
inputdir (str, optional) – Input directory, used only if any input file in the pipeline is not an absolute path, defaults to ‘root’.
outputdir (str, optional) – Output directory, used only if any output file in the pipeline is not an absolute path, defaults to ‘root’.
interactive (bool, optional) – Allows for user interactions, defaults to False.
log_level (Literal[ 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], optional) – Logger level (not case sensitive), defaults to ‘INFO’.
- inputdir: Annotated[Path, PathType(path_type=dir)] | None
- interactive: bool | None
- log_level: Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] | None
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_post_init(context: Any, /) None
This function is meant to behave like a BaseModel method to initialize private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Args:
self: The BaseModel instance. context: The context.
- outputdir: Annotated[Path, PathType(path_type=dir)] | None
- property profile
Return the profiling flag.
- Type:
bool
- root: Annotated[Path, PathType(path_type=dir)] | None
- property spawn
Return the spawned worker flag.
- Type:
int
CHAP.pipeline module
Base pipeline Pydantic model classes.
- class Pipeline[source]
Bases:
CHAPBaseModelClass representing a full Pipeline object.
- Variables:
args (list[dict]) – List of PipelineItem arguments for each item in the full pipeline.
logger (logging.Logger, optional) – CHAP logger.
mmcs (list[pydantic._internal._model_construction.ModelMetaclass]) – List of PipelineItems classes in the full pipeline.
- args: Annotated[list[dict], Len(min_length=1, max_length=None)]
- execute()[source]
Executes the pipeline.
- Returns:
List of PipelineData items after pipeline execution.
- Return type:
list[PipelineData]
- logger: Logger | None
- mmcs: Annotated[list[ModelMetaclass], Len(min_length=1, max_length=None)]
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_post_init(context: Any, /) None
This function is meant to behave like a BaseModel method to initialize private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Args:
self: The BaseModel instance. context: The context.
- class PipelineData(name=None, data=None, schema=None)[source]
Bases:
dictWrapper for all results of PipelineItem.execute.
- class PipelineItem(*, root: Annotated[Path, PathType(path_type=dir)] | None = '/home/runner/work/ChessAnalysisPipeline/ChessAnalysisPipeline/docs', inputdir: Annotated[Path, PathType(path_type=dir)] | None = None, outputdir: Annotated[Path, PathType(path_type=dir)] | None = None, interactive: bool | None = False, log_level: Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] | None = 'INFO')[source]
Bases:
RunConfigClass representing a single item in a Pipeline object.
- Variables:
logger (logging.Logger, optional) – CHAP logger.
name (str, optional) – Pipeline object name.
schema (str, optional) – Pipeline object schema.
- execute(data)[source]
Execute the appropriate method of the object and return the result.
- Parameters:
data (list[PipelineData]) – Input data.
- Returns:
Wrapped result of executing read, process, or write.
- Return type:
PipelineData | tuple[PipelineData]
- get_config(data=None, config=None, schema=None, remove=True)[source]
Look through data for the last item which value for the ‘schema’ key matches schema. Convert the value for that item’s ‘data’ key into the configuration’s Pydantic model identified by schema and return it. If no item is found and config and schema are specified, validate config against the configuration’s Pydantic model identified by schema and return it. Return config if no item is found and config is specified, but schema is not.
- Parameters:
data (list[PipelineData], optional) – Input data.
config (dict, optional) – Initialization parameters for an instance of the Pydantic model identified by schema, required if data is unspecified, invalid or does not contain an item that matches the schema, superseeds any equal parameters contained in data.
schema (str, optional) – Schema of the PipelineItem class to match in data, defaults to the internal PipelineItem schema attribute.
remove (bool, optional) – If there is a matching entry in data, remove it from the list, defaults to True.
- Raises:
ValueError – If there’s no match for schema in data.
- Returns:
Last matching validated configuration model.
- Return type:
- static get_data(data, name=None, schema=None, remove=True)[source]
Look through data for the last item which ‘data’ value is a NeXus style NXobject object or matches a given name or schema. Pick the last item for which the ‘name’ key matches name if set or the ‘schema’ key matches schema if set, pick the last match for a NXobjecta object otherwise. Return the data object.
- Parameters:
data (list[PipelineData].) – Input data.
name (str, optional) – Name of the PipelineItem class to match in data.
schema (str | list[str], optional) – Schema of the PipelineItem class to match in data & return.
remove (bool, optional) – If there is a matching entry in data, remove it from the list, defaults to True.
- Raises:
ValueError – If there’s no match for name or ‘schema` in data, or if there is no object of type nexusformat.nexus.NXobject.
- Returns:
Last matching data item.
- Return type:
Any
- static get_default_nxentry(nxobject)[source]
Given a NeXus style NXroot object or a NeXus style NXentry object, return the default or first NXentry match.
- Parameters:
nxobject (nexusformat.nexus.NXroot | nexusformat.nexus.NXentry) – Input data.
- Raises:
ValueError – If unable to retrieve a NXentry object.
- Returns:
Input data if a NXentry object or the default or first NXentry object if a NXroot object.
- Return type:
nexusformat.nexus.NXentry
- static get_nxroot(nxobject)[source]
Given a NeXus style NXroot object or a NeXus style NXentry object, return a NXroot object with the appropriate default path to the NXentry object set.
- Parameters:
nxobject (nexusformat.nexus.NXroot | nexusformat.nexus.NXentry) – Input data.
- Raises:
ValueError – If unable to retrieve a NXroot or NXentry object.
- Returns:
Input data if a NXroot object or a NXroot object with the input as its default NXentry object.
- Returns:
NXroot object.
- Return type:
nexusformat.nexus.NXroot
- static get_pipelinedata_item(data, index=-1, remove=False)[source]
If ‘data’ is a list, then retrieve from data the list item matching index and return it’s data value, otherwise return data itself.
- Parameters:
data (Any | list[PipelineData]) – Input data.
index (int, optional) – List index of the item to retrieve from data, default to -1 or the last item in the list.
remove (bool, optional) – If there is a matching entry in data, remove it from the list, defaults to False.
- Returns:
Matching data item.
- Return type:
Any
- has_filename()[source]
Does the PipelineItem has a filename class attribute?
- Returns:
True if the PipelineItem has a filename class attribute.
- Return type:
bool
- logger: Logger | None
- property method
Return the PipelineItems read, process or write method.
- Type:
types.MethodType
- property method_type
Return the PipelineItems execute method type.
- Type:
Literal[‘read’, ‘process’, ‘write’]
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_post_init(context: Any, /) None
This function is meant to behave like a BaseModel method to initialize private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Args:
self: The BaseModel instance. context: The context.
- name: Annotated[str, StringConstraints(strip_whitespace=True, to_upper=None, to_lower=None, strict=None, min_length=1, max_length=None, pattern=None, ascii_only=None)] | None
- schema_: Annotated[str, StringConstraints(strip_whitespace=True, to_upper=None, to_lower=None, strict=None, min_length=1, max_length=None, pattern=None, ascii_only=None)] | None
- set_args(**args)[source]
Set the PipelineItems execution method run time arguments that are allowed by its method declaration.
- Param:
PipelineItems execution method run time arguments.
- Type:
dict
- property status
Return the PipelineItems status.
- Type:
Literal[‘read’, ‘write_pending’, ‘written’]
- static unwrap_pipelinedata(data)[source]
Given a list of PipelineData objects, return a list of their data values.
- Parameters:
data (list[PipelineData]) – Input data to read, write, or process that needs to be unwrapped from PipelineData before use.
- Returns:
‘data’ values of the items in the input data.
- Return type:
list
CHAP.processor module
Module defining the base Processor class to derive all others from.
- class Processor(*, root: Annotated[Path, PathType(path_type=dir)] | None = '/home/runner/work/ChessAnalysisPipeline/ChessAnalysisPipeline/docs', inputdir: Annotated[Path, PathType(path_type=dir)] | None = None, outputdir: Annotated[Path, PathType(path_type=dir)] | None = None, interactive: bool | None = False, log_level: Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] | None = 'INFO')[source]
Bases:
PipelineItemBase processor.
The job of any Processor in a pipeline is to receive data returned by a previous PipelineItem, process it in some way, and return the result for the following PipelineItems to use.
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_post_init(context: Any, /) None
This function is meant to behave like a BaseModel method to initialize private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Args:
self: The BaseModel instance. context: The context.
- process(data)[source]
Extract the contents of the input data, add a string to it, and return the amended value.
- Parameters:
data (list[PipelineData]) – Input data.
- Returns:
Processed data.
- Return type:
str
- main(opt_parser=<class 'CHAP.processor.OptionParser'>)[source]
Main function.
- Parameters:
opt_parser (OptionParser) – User based option parser.
CHAP.reader module
Module defining the base Reader class to derive all others from.
- class Reader(*, root: Annotated[Path, PathType(path_type=dir)] | None = '/home/runner/work/ChessAnalysisPipeline/ChessAnalysisPipeline/docs', inputdir: Annotated[Path, PathType(path_type=dir)] | None = None, outputdir: Annotated[Path, PathType(path_type=dir)] | None = None, interactive: bool | None = False, log_level: Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] | None = 'INFO')[source]
Bases:
PipelineItemBase reader.
The job of any Reader in a pipeline is to provide data stored in a file to the list of PipelineItems.
- Variables:
filename (str) – Name of file to read from.
- filename: Annotated[str, StringConstraints(strip_whitespace=True, to_upper=None, to_lower=None, strict=None, min_length=1, max_length=None, pattern=None, ascii_only=None)]
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_post_init(context: Any, /) None
This function is meant to behave like a BaseModel method to initialize private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Args:
self: The BaseModel instance. context: The context.
CHAP.runner module
Main functions to execute a ChessAnalysisPipeline (CHAP).
- batch_runner(run_config, pipeline_config, log_file)[source]
Function for running a pipeline in batch mode with logging to file. Essentially a wrapper for the
runner()function.- Parameters:
run_config (RunConfig) – CHAP run configuration.
pipeline_config (dict) – CHAP Pipeline configuration.
file (log) – Name of file for logging.
- parser()[source]
Return an argument parser for the CHAP comment line interface (CLI). This parser accepts one required argument: the input CHAP configuration file name and several optional arguments. Execute:
$ CHAP --help
from the command line for a description on how to use CHAP.
- run(run_config, pipeline_config, logger=None, log_handler=None, comm=None)[source]
Run a given pipeline_config.
- Parameters:
run_config (RunConfig) – CHAP run configuration.
pipeline_config (dict) – CHAP Pipeline configuration.
logger (logging.Logger, optional) – Logger.
log_handler (logging.StreamHandler, optional) – Logging handler.
comm (mpi4py.MPI.Comm, optional) – MPI communicator.
- Returns:
data field of the first item in the returned list of pipeline items.
CHAP.server module
Python server with thread pool and CHAP pipeline.
Client side
cat /tmp/chap.json {“pipeline”: [{“common.PrintProcessor”: {}}], “input”: 1}
Curl call to the server with CHAP pipeline
curl -X POST -H “Content-type: application/json” -d@/tmp/chap.json http://localhost:5000/pipeline {“pipeline”: [{“common.PrintProcessor”:{}}], “status”:”ok”}
Server side
flask –app server run
Serving Flask app ‘server’
Debug mode: off
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
Running on http://127.0.0.1:5000
Press CTRL+C to quit
CHAP output:
CHAP.server : Call pipeline args=()
kwds={'pipeline': [{'common.PrintProcessor': {}}]}
CHAP.server : pipeline [{'common.PrintProcessor': {}}]
CHAP.server : Loaded
<CHAP.common.processor.PrintProcessor object at 0x10e0f1ed0>
CHAP.server : Loaded
<CHAP.pipeline.Pipeline object at 0x10e0f1f10> with 1 items
CHAP.server : Calling "execute" on <CHAP.pipeline.Pipeline
object at 0x10e0f1f10>
Pipeline : Executing "execute"
Pipeline : Calling "process" on
<CHAP.common.processor.PrintProcessor object at 0x10e0f1ed0>
PrintProcessor : Executing "process" with
type(data)=<class 'NoneType'>
PrintProcessor data : None
PrintProcessor : Finished "process" in 0.000 seconds
Pipeline : Executed "execute" in 0.000 seconds
CHAP.taskmanager module
Python thread pool, see http://code.activestate.com/recipes/577187-python-thread-pool
- class StoppableThread(target, name, args)[source]
Bases:
ThreadThread class with a stop() method. The thread itself has to check regularly for the stopped() condition.
- class TaskManager(nworkers=10, name='TaskManager')[source]
Bases:
objectTask manager class based on thread module which executes assigned tasks concurently. It uses a pool of thread workers, queue of tasks and pid set to monitor jobs execution.
Use case: mgr = TaskManager() jobs = [] jobs.append(mgr.spawn(func, args)) mgr.joinall(jobs)
- class Worker(name, taskq, pidq, uidq, *, logger=None)[source]
Bases:
ThreadThread executing worker from a given tasks queue.
CHAP.writer module
Module defining the base Writer class to derive all others from.
- class Writer(*, root: Annotated[Path, PathType(path_type=dir)] | None = '/home/runner/work/ChessAnalysisPipeline/ChessAnalysisPipeline/docs', inputdir: Annotated[Path, PathType(path_type=dir)] | None = None, outputdir: Annotated[Path, PathType(path_type=dir)] | None = None, interactive: bool | None = False, log_level: Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] | None = 'INFO')[source]
Bases:
PipelineItemBase writer.
The job of any Writer in a pipeline is to receive input returned as a previous PipelineItem and write its data to file in a particular file format.
- Variables:
filename (str) – Name of file to write to.
force_overwrite (bool, optional) – Flag to allow data in filename to be overwritten if it already exists, defaults to False.
remove (bool, optional) – Flag to remove the dictionary from data, defaults to False.
- filename: str
- force_overwrite: bool | None
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_post_init(context: Any, /) None
This function is meant to behave like a BaseModel method to initialize private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Args:
self: The BaseModel instance. context: The context.
- remove: bool | None
- write(data)[source]
Write the last PipelineData item in data as text to a file.
- Parameters:
data (list[PipelineData]) – Input data.