Source code for CHAP.pipeline

#!/usr/bin/env python
#-*- coding: utf-8 -*-
"""Base pipeline `Pydantic <https://github.com/pydantic/pydantic>`__
model classes.
"""

# System modules
import logging
from time import time
from types import MethodType
from typing import (
    Literal,
    Optional,
)

# Third party modules
from pydantic import (
    Field,
#    FilePath,
    PrivateAttr,
    conlist,
    constr,
    model_validator,
)
from pydantic._internal._model_construction import ModelMetaclass

# Local modules
from CHAP.models import (
    CHAPBaseModel,
    RunConfig,
)


[docs] class PipelineData(dict): """Wrapper for all results of PipelineItem.execute.""" def __init__(self, name=None, data=None, schema=None): super().__init__() self.__setitem__('name', name) self.__setitem__('data', data) self.__setitem__('schema', schema)
[docs] class PipelineItem(RunConfig): """Class representing a single item in a `Pipeline` object. :ivar logger: CHAP logger. :vartype logger: logging.Logger, optional :ivar name: `Pipeline` object name. :vartype name: str, optional :ivar schema: `Pipeline` object schema. :vartype schema: str, optional """ logger: Optional[logging.Logger] = None name: Optional[constr(strip_whitespace=True, min_length=1)] = None schema_: Optional[constr(strip_whitespace=True, min_length=1)] = \ Field(None, alias='schema') _method: MethodType = PrivateAttr(default=None) _method_type: Literal[ 'read', 'process', 'write'] = PrivateAttr(default=None) _args: dict = PrivateAttr(default={}) _allowed_args: conlist(item_type=str) = PrivateAttr(default=[]) # _metadata: dict = PrivateAttr(default=None) # _provenance: dict = PrivateAttr(default=None) _status: Literal[ 'read', 'write_pending', 'written'] = PrivateAttr(default=None)
[docs] @model_validator(mode='after') def validate_pipelineitem_after(self): """Validate the `PipelineItem` configuration. :return: Validated configuration. :rtype: PipelineItem """ # System modules from inspect import signature if self.name is None: self.__name__ = self.__class__.__name__ else: self.__name__ = self.name if self.logger is None: self.logger = logging.getLogger(self.__name__) self.logger.propagate = False log_handler = logging.StreamHandler() log_handler.setFormatter(logging.Formatter( '{asctime}: {name:20}: {levelname}: {message}', datefmt='%Y-%m-%d %H:%M:%S', style='{')) self.logger.addHandler(log_handler) self.logger.setLevel(self.log_level) # Optinal, but it's already available in the 'name' field #if self.get_schema() is None: # mod_name = '.'.join(self.__class__.__module__.split('.')[1:]) # self.schema_ = f'{mod_name}.{self.__class__.__name__}' if hasattr(self, 'read'): self._method_type = 'read' elif hasattr(self, 'process'): self._method_type = 'process' elif hasattr(self, 'write'): self._method_type = 'write' else: return self self._method = getattr(self, self._method_type) sig = signature(self._method) self._allowed_args = [k for k, v in sig.parameters.items() if v.kind == v.POSITIONAL_OR_KEYWORD] return self
@property def method(self): """Return the `PipelineItem`\\s `read`, `process` or `write` method. :type: types.MethodType """ return self._method @property def method_type(self): """Return the `PipelineItem`\\s execute method type. :type: Literal['read', 'process', 'write'] """ return self._method_type @property def run_config(self): """Return the `PipelineItem`\\s run configuration. :type: RunConfig """ return RunConfig(**self.model_dump()).model_dump() @property def status(self): """Return the `PipelineItem`\\s status. :type: Literal['read', 'write_pending', 'written'] """ return self._status @status.setter def status(self, status): """Set the `PipelineItem`\\s status. :param status: `PipelineItem`\\s status. :type: Literal['read', 'write_pending', 'written'] """ self._status = status
[docs] def get_args(self): """Return the `PipelineItem`\\s execution method run time arguments. :type: dict """ return self._args
[docs] def set_args(self, **args): """Set the `PipelineItem`\\s execution method run time arguments that are allowed by its method declaration. :param: `PipelineItem`\\s execution method run time arguments. :type: dict """ for k, v in args.items(): if k in self._allowed_args: self._args[k] = v
[docs] def has_filename(self): """Does the `PipelineItem` has a `filename` class attribute? :return: `True` if the `PipelineItem` has a `filename` class attribute. :rtype: bool """ return hasattr(self, 'filename') and self.filename is not None
[docs] def get_schema(self): """Return the `PipelineItem`\\s schema. :type: str """ return self.schema_
[docs] def get_config( self, data=None, config=None, schema=None, remove=True): """Look through `data` for the last item which value for the `'schema'` key matches `schema`. Convert the value for that item's `'data'` key into the configuration's `Pydantic <https://github.com/pydantic/pydantic>`__ model identified by `schema` and return it. If no item is found and `config` and `schema` are specified, validate `config` against the configuration's Pydantic model identified by `schema` and return it. Return `config` if no item is found and `config` is specified, but `schema` is not. :param data: Input data. :type data: list[PipelineData], optional :param config: Initialization parameters for an instance of the `Pydantic <https://github.com/pydantic/pydantic>`__ model identified by `schema`, required if data is unspecified, invalid or does not contain an item that matches the schema, superseeds any equal parameters contained in `data`. :type config: dict, optional :param schema: Schema of the `PipelineItem` class to match in `data`, defaults to the internal PipelineItem `schema` attribute. :type schema: str, optional :param remove: If there is a matching entry in `data`, remove it from the list, defaults to `True`. :type remove: bool, optional :raises ValueError: If there's no match for `schema` in `data`. :return: Last matching validated configuration model. :rtype: PipelineItem """ self.logger.debug(f'Getting {schema} configuration') t0 = time() if schema is None: schema = self.schema_ matching_config = False if data is not None: try: for i, d in reversed(list(enumerate(data))): if d.get('schema') == schema: matching_config = d.get('data') if remove: data.pop(i) break except Exception: pass if matching_config: if config is not None: # Local modules from CHAP.utils.general import dictionary_update # Update matching_config with config if both exist matching_config = dictionary_update(matching_config, config) else: if isinstance(config, dict): matching_config = config else: raise ValueError( f'Unable to find a configuration for schema `{schema}`') if self._method_type == 'read' and 'inputdir' not in matching_config: matching_config['inputdir'] = self.inputdir if self._method_type == 'write' and 'outputdir' not in matching_config: matching_config['outputdir'] = self.outputdir mod_name, cls_name = schema.rsplit('.', 1) module = __import__(f'CHAP.{mod_name}', fromlist=cls_name) model_config = getattr(module, cls_name)(**matching_config) self.logger.debug( f'Got {schema} configuration in {time()-t0:.3f} seconds') return model_config
[docs] @staticmethod def get_data(data, name=None, schema=None, remove=True): """Look through `data` for the last item which `'data'` value is a NeXus style `NXobject <https://manual.nexusformat.org/classes/base_classes/NXobject.html#index-0>`__ object or matches a given name or schema. Pick the last item for which the `'name'` key matches `name` if set or the `'schema'` key matches `schema` if set, pick the last match for a `NXobjecta` object otherwise. Return the data object. :param data: Input data. :type data: list[PipelineData]. :param name: Name of the `PipelineItem` class to match in `data`. :type name: str, optional :param schema: Schema of the `PipelineItem` class to match in `data` & return. :type schema: str | list[str], optional :param remove: If there is a matching entry in `data`, remove it from the list, defaults to `True`. :type remove: bool, optional :raises ValueError: If there's no match for `name` or 'schema` in `data`, or if there is no object of type nexusformat.nexus.NXobject. :return: Last matching data item. :rtype: Any """ # Third party modules from nexusformat.nexus import NXobject result = None if name is None and schema is None: for i, d in reversed(list(enumerate(data))): if isinstance(d.get('data'), NXobject): result = d.get('data') if remove: data.pop(i) break else: raise ValueError('No NXobject data item found') elif name is not None: for i, d in reversed(list(enumerate(data))): if d.get('name') == name: result = d.get('data') if remove: data.pop(i) break else: raise ValueError(f'No match for data item named "{name}"') elif schema is not None: if isinstance(schema, str): schema = [schema] for i, d in reversed(list(enumerate(data))): if d.get('schema') in schema: result = d.get('data') if remove: data.pop(i) break else: raise ValueError( f'No match for data item with schema "{schema}"') return result
[docs] @staticmethod def get_default_nxentry(nxobject): """Given a NeXus style `NXroot <https://manual.nexusformat.org/classes/base_classes/NXroot.html#index-0>`__ object or a NeXus style `NXentry <https://manual.nexusformat.org/classes/base_classes/NXentry.html#index-0>`__ object, return the default or first `NXentry` match. :param nxobject: Input data. :type nxobject: nexusformat.nexus.NXroot | nexusformat.nexus.NXentry :raises ValueError: If unable to retrieve a `NXentry` object. :return: Input data if a `NXentry` object or the default or first `NXentry` object if a `NXroot` object. :rtype: nexusformat.nexus.NXentry """ # Third party modules from nexusformat.nexus import ( NXentry, NXroot, ) if isinstance(nxobject, NXroot): if 'default' in nxobject.attrs: nxentry = nxobject[nxobject.default] else: nxentries = [ v for v in nxobject.values() if isinstance(v, NXentry)] if not nxentries: raise ValueError('Unable to retrieve a NXentry object') if len(nxentries) != 1: print('WARNING: Found multiple NXentries, returning the ' 'first') nxentry = nxentries[0] elif isinstance(nxobject, NXentry): nxentry = nxobject else: raise ValueError(f'Invalid parameter nxobject ({nxobject})') return nxentry
[docs] @staticmethod def get_nxroot(nxobject): """Given a NeXus style `NXroot <https://manual.nexusformat.org/classes/base_classes/NXroot.html#index-0>`__ object or a NeXus style `NXentry <https://manual.nexusformat.org/classes/base_classes/NXentry.html#index-0>`__ object, return a `NXroot` object with the appropriate default path to the `NXentry` object set. :param nxobject: Input data. :type nxobject: nexusformat.nexus.NXroot | nexusformat.nexus.NXentry :raises ValueError: If unable to retrieve a `NXroot` or `NXentry` object. :return: Input data if a `NXroot` object or a `NXroot` object with the input as its default `NXentry` object. :return: `NXroot` object. :rtype: nexusformat.nexus.NXroot """ # Third party modules from nexusformat.nexus import ( NXentry, NXroot, ) if isinstance(nxobject, NXroot): nxroot = nxobject elif isinstance(nxobject, NXentry): nxroot = NXroot() nxroot[nxobject.nxname] = nxobject nxobject.set_default() else: raise ValueError(f'Invalid nxobject ({type(nxobject)}') return nxroot
[docs] @staticmethod def get_pipelinedata_item(data, index=-1, remove=False): """If 'data' is a list, then retrieve from `data` the list item matching `index` and return it's `data` value, otherwise return `data` itself. :param data: Input data. :type data: Any | list[PipelineData] :param index: List index of the item to retrieve from `data`, default to -1 or the last item in the list. :type index: int, optional :param remove: If there is a matching entry in `data`, remove it from the list, defaults to `False`. :type remove: bool, optional :return: Matching data item. :rtype: Any """ if isinstance(data, list): if remove: return data.pop(index)['data'] return data[index]['data'] return data
[docs] @staticmethod def unwrap_pipelinedata(data): """Given a list of PipelineData objects, return a list of their `data` values. :param data: Input data to read, write, or process that needs to be unwrapped from PipelineData before use. :type data: list[PipelineData] :return: `'data'` values of the items in the input data. :rtype: list """ unwrapped_data = [] if isinstance(data, list): for d in data: if isinstance(d, PipelineData): unwrapped_data.append(d['data']) else: unwrapped_data.append(d) else: unwrapped_data = [data] return unwrapped_data
[docs] def execute(self, data):#, metadata, provenance): """Execute the appropriate method of the object and return the result. :param data: Input data. :type data: list[PipelineData] :return: Wrapped result of executing read, process, or write. :rtype: PipelineData | tuple[PipelineData] """ # self._metadata = metadata # self._provenance = provenance if 'data' in self._allowed_args: self._args['data'] = data t0 = time() self.logger.debug(f'Executing "{self._method_type}" with schema ' f'"{self.get_schema()}" and {self._args}') self.logger.info(f'Executing "{self._method_type}"') data = self._method(**self._args) self.logger.info( f'Finished "{self._method}" in {time()-t0:.0f} seconds\n') return data
[docs] class Pipeline(CHAPBaseModel): """Class representing a full `Pipeline` object. :ivar args: List of `PipelineItem` arguments for each item in the full pipeline. :vartype args: list[dict] :ivar logger: CHAP logger. :vartype logger: logging.Logger, optional :ivar mmcs: List of `PipelineItem`\\s classes in the full pipeline. :vartype mmcs: list[pydantic._internal._model_construction.ModelMetaclass] """ args: conlist(item_type=dict, min_length=1) logger: Optional[logging.Logger] = None mmcs: conlist(item_type=ModelMetaclass, min_length=1) _data: conlist(item_type=PipelineData) = PrivateAttr(default=[]) _items: conlist(item_type=PipelineItem) = PrivateAttr(default=[]) #_output_filenames: conlist(item_type=FilePath) = PrivateAttr(default=[]) _filename_mapping: dict = PrivateAttr(default={}) # _metadata: dict = PrivateAttr( # default={'application': 'CHAP', 'user_metadata': {}}) # _provenance: dict = PrivateAttr(default={})
[docs] @model_validator(mode='after') def validate_pipeline_after(self): """Validate the `Pipeline` configuration and initialize and validate the private attributes. :return: Validated configuration. :rtype: Pipeline """ t0 = time() self.__name__ = self.__class__.__name__ if self.logger is None: self.logger = logging.getLogger(self.__name__) self.logger.propagate = False output_filenames = [] for mmc, args in zip(self.mmcs, self.args): # FIX add a validation status, so that the validator # doesn't get executed twice with the config staying # on the pipeline for processors self.logger.info(f'Validating {mmc}') item = mmc(data=self._data, modelmetaclass=mmc, **args) if item.has_filename(): if item.method_type == 'read': if item._mapping_filename in self._filename_mapping: item.filename = self._filename_mapping[ item._mapping_filename]['path'] item.status = self._filename_mapping[ item._mapping_filename]['status'] else: #if item.filename in self._output_filenames: if item.filename in output_filenames: self._filename_mapping[item._mapping_filename] = { 'path': item.filename, 'status': 'write_pending'} item.status = 'write_pending' else: self._filename_mapping[item._mapping_filename] = { 'path': item.filename, 'status': None} elif item.method_type == 'write': if (not item.force_overwrite and item.filename in output_filenames): #and self.filename in self._output_filenames): raise ValueError( 'Writing to an existing file without overwrite ' f'permission. Remove {item.filename} or set ' '"force_overwrite" in the pipeline configuration ' f'for {item.name}') item.set_args(**args) if (item.method_type == 'read' and item.status not in ('read', 'write_pending')): if item.get_schema() is not None: self.logger.debug( f'Reading "{item.name}" with schema ' f'"{item.get_schema()}" and {item.get_args()}') self.logger.info(f'Reading "{item.name}"') data = item.method(**item.get_args()) self._data.append(PipelineData( name=item.name, data=data, schema=item.get_schema())) if item.has_filename(): self._filename_mapping[ item._mapping_filename]['status'] = 'read' # FIX make part of pipelineitem for read item.status = 'read' if item.method_type == 'write' and item.has_filename(): for k, v in self._filename_mapping.items(): if v['path'] == item.filename: self._filename_mapping[k]['status'] = \ 'write_pending' #if item.filename not in self._output_filenames: # self._output_filenames.append(item.filename) if item.filename not in output_filenames: output_filenames.append(item.filename) self._items.append(item) self.logger.info(f'Validated pipeline in {time()-t0:.3f} seconds') return self
[docs] def execute(self): """Executes the pipeline. :return: List of `PipelineData` items after pipeline execution. :rtype: list[PipelineData] """ t0 = time() self.logger.info('Executing "execute"\n') for mmc, item, args in zip(self.mmcs, self._items, self.args): if hasattr(item, 'execute'): current_item = mmc(data=self._data, modelmetaclass=mmc, **args) read_status = None if item.method_type == 'read' and item.has_filename(): read_status = self._filename_mapping[ item._mapping_filename]['status'] current_item.status = read_status current_item.filename = item.filename current_item.set_args(**item.get_args()) # FIX RV update to only read when not yet read or when # written to in the mean time, make this happen for any # type of read, from file, url, ... if not (item.method_type == 'read' and read_status == 'read'): self.logger.info( f'Calling "execute" on {current_item.name}') data = current_item.execute(self._data) # self._data, self._metadata, self._provenance) if current_item.method_type == 'read': for _, d in reversed(list(enumerate(self._data))): if d == PipelineData( name=current_item.name, data=data, schema=current_item.get_schema()): break else: self._data.append(PipelineData( name=current_item.name, data=data, schema=current_item.get_schema())) #FIX RF move to pipelineitem after read current_item.status = 'read' else: if isinstance(data, tuple): self._data.extend( [d if isinstance(d, PipelineData) else PipelineData( name=current_item.name, data=d, schema=current_item.get_schema()) for d in data]) elif isinstance(data, PipelineData): self._data.append(data) elif data is not None: self._data.append(PipelineData( name=current_item.name, data=data, schema=current_item.get_schema())) if item.method_type == 'write' and item.has_filename(): for k, v in self._filename_mapping.items(): if v['path'] == item.filename: self._filename_mapping[k]['status'] = 'written' self.logger.info(f'Executed "execute" in {time()-t0:.3f} seconds') return self._data