Source code for CHAP.foxden.processor

#!/usr/bin/env python
#-*- coding: utf-8 -*-
"""Module for Processors unique to the
`FOXDEN <https://github.com/CHESSComputing/FOXDEN>`__
integration with CHAP.

Add discription of FOXDEN
"""

# System modules
import os
from typing import (
    Literal,
    Optional,
)
# Third party modules
from pydantic import conint

# Local modules
from CHAP.processor import Processor


#class FoxdenMetadataProcessor(Processor):
#    """Processor to collect CHAP workflow metadata from a workflow
#    NeXus output object.
#    """
#    def process(self, data):
#        """Extract metadata from a workflow NeXus output object for
#        submission to the FOXDEN Metadata service.
#
#        :param data: Input data.
#        :type data: list[PipelineData]
#        :return: CHAP workflow metadata record.
#        :rtype: dict
#        """
#        # Third party modules
#        from json import loads
#        from nexusformat.nexus import (
#            NXentry,
#            NXroot,
#        )
#
#        # Load and validate the workflow NeXus output object
#        nxentry = self.get_data(data, remove=False)
#        if isinstance(nxentry, NXroot):
#            nxentry = nxentry[nxentry.default]
#        if not isinstance(nxentry, NXentry):
#            raise ValueError(f'Invalid input data type {type(nxentry)}')
#
#        # Get did and experiment type
#        map_config = loads(str(nxentry.map_config))
#        did = map_config['did']
#        experiment_type = map_config['experiment_type']
#
#        # Extract metadata
#        method = getattr(self, f'_get_metadata_{experiment_type.lower()}')
#        metadata = method(nxentry)
#
#        if 'reconstructed_data' in metadata:
#            did = f'{did}/{experiment_type.lower()}_reconstructed'
#        else:
#            did = f'{did}/{experiment_type.lower()}_reduced'
#        return {'did': did, 'application': 'CHAP', 'metadata': metadata}
#
#    def _get_metadata_tomo(self, nxentry):
#        metadata = {}
#        if 'reduced_data' in nxentry:
#            data = nxentry.reduced_data
#            metadata.update({
#                'reduced_data': {
#                    'date': str(data.date),
#                    'img_row_bounds': data.img_row_bounds.tolist(),
#                }
#            })
#        if 'reconstructed_data' in nxentry:
#            data = nxentry.reconstructed_data
#            metadata.update({
#                'reconstructed_data': {
#                    'date': str(data.date),
#                    'center_offsets': data.center_offsets.tolist(),
#                    'center_rows': data.center_offsets.tolist(),
#                    'center_stack_index': int(data.center_stack_index),
#                    'x_bounds': data.x_bounds.tolist(),
#                    'y_bounds': data.y_bounds.tolist(),
#                }
#            })
#        if 'combined_data' in nxentry:
#            data = nxentry.combined_data
#            metadata.update({
#                'combined_data': {
#                    'date': str(data.date),
#                }
#            })
#        return metadata


#class FoxdenProvenanceProcessor(Processor):
#    """Processor to collect CHAP workflow provenance data."""
#    def process(self, data):
#        """Extract provenance data from the pipeline data for
#        submission to the FOXDEN Provenance service.
#
#        :param data: Input data.
#        :type data: list[PipelineData]
#        :return: CHAP workflow provenance record.
#        :rtype: dict
#        """
#        # Local modules
#        from CHAP.common.utils import (
#            osinfo,
#            environments,
#        )
#        # Load the provenance info
#        provenance = self.get_data(data, schema='provenance')
#
#        # Add system info to provenance data
#        provenance.update({
#            'environments': environments(),
#            'osinfo': osinfo(),
#            'processing': 'CHAP pipeline',
#            'scripts': [
#                {'name': 'CHAP', 'parent_script': None, 'order_idx': 1}],
#            'site': 'Cornell',
#        })
#
#        return provenance


[docs] class ProvenanceFileProcessor(Processor): """A Processor that retrieves a `FOXDEN <https://github.com/CHESSComputing/FOXDEN>`__ provenance record from the pipeline and returns the content of the in or output file listed in the record. :ivar file_type: The `'file_type'` in the provenance record, defaults to `'output'`. :vartype file_type: Literal['input', 'output'], optional. :ivar nxmemory: Maximum memory usage when reading NeXus files, ignore for any other file type. :vartype nxmemory: int, optional """ file_type: Optional[Literal['input', 'output']] = 'output' nxmemory: Optional[conint(gt=0)] = None
[docs] def process(self, data): """Return the content of in or output files listed in the provenance record. :return: The file content. :rtype: Any """ # Local modules from CHAP.tomo.processor import read_metadata_provenance try: _, provenance = read_metadata_provenance( data, logger=self.logger, remove=False) filenames = [v['name'] for v in provenance if v['file_type'] == 'output'] if not filenames: raise ValueError('Unable to get an output file name from ' f'provenance ({provenance})') if len(filenames) > 1: raise ValueError('Unable to get a unique output file name ' f'from provenance ({provenance})') filename = filenames[0] except ValueError: raise # FIX modify CHAP.reader to be a generic reader, based on ext # Can use __import__ as well ext = os.path.splitext(filename)[1][1:] if ext == 'nxs': # Local modules from CHAP.common.reader import NexusReader reader = NexusReader(filename=filename, **self.model_dump()) elif ext in ('yml', 'yaml'): # Local modules from CHAP.common.reader import YAMLReader reader = YAMLReader(filename=filename, **self.model_dump()) else: raise ValueError('ProvenanceOutputReader not yet implemented for ' f'files with extension {ext}') return reader.read()
if __name__ == '__main__': # Local modules from CHAP.processor import main main()