#!/usr/bin/env python
#-*- coding: utf-8 -*-
"""Module defining the base `Processor` class to derive all others
from.
"""
# System modules
import argparse
import logging
from sys import modules
# Third party modules
from pydantic import model_validator
# Local modules
from CHAP.pipeline import PipelineItem
[docs]
class Processor(PipelineItem):
"""Base processor.
The job of any `Processor` in a pipeline is to receive data
returned by a previous `PipelineItem`, process it in some way,
and return the result for the following `PipelineItem`\\s to use.
"""
# FIX add a validation status, so that the validator doesn't get
# executed twice with the config staying on the pipeline
[docs]
@model_validator(mode='before')
@classmethod
def validate_processor_before(cls, data):
"""Validate the `Processor` class attributes.
:param data:
`Pydantic <https://github.com/pydantic/pydantic>`__
validator data object.
:type data: dict
:return: Currently validated class attributes.
:rtype: dict
"""
# System modules
from copy import deepcopy
# Local modules
from CHAP.utils.general import (
dictionary_update,
is_str_or_str_series,
)
if isinstance(data, dict):
if 'data' in data and 'modelmetaclass' in data:
mmc = data['modelmetaclass']
pipeline_fields = mmc.model_fields.get('pipeline_fields')
if pipeline_fields is not None:
for k, v in pipeline_fields.default.items():
if is_str_or_str_series(v, log=False):
schema = v
merge_key_paths = None
else:
schema = v.get('schema')
merge_key_paths = v.get('merge_key_paths')
try:
value = deepcopy(mmc.get_data(
data['data'], schema=schema, remove=False))
except Exception:
pass
else:
if k in data:
data[k] = dictionary_update(
value, data[k],
merge_key_paths=merge_key_paths,
sort=True)
else:
data[k] = value
return data
[docs]
def process(self, data):
"""Extract the contents of the input data, add a string to it,
and return the amended value.
:param data: Input data.
:type data: list[PipelineData]
:return: Processed data.
:rtype: str
"""
# If needed, extract data from a returned value of Reader.read
if isinstance(data, list):
if all(isinstance(d, dict) for d in data):
data = data[0]['data']
if data is None:
return []
# The process operation is a simple string concatenation
data += 'process part\n'
# Return data back to pipeline
return data
[docs]
class OptionParser():
"""User based option parser."""
def __init__(self):
self.parser = argparse.ArgumentParser(prog='PROG')
self.parser.add_argument(
'--data', action='store',
dest='data', default='', help='Input data')
self.parser.add_argument(
'--processor', action='store',
dest='processor', default='Processor', help='Processor class name')
self.parser.add_argument(
'--log-level', choices=logging._nameToLevel.keys(),
dest='log_level', default='INFO', help='logging level')
[docs]
def main(opt_parser=OptionParser):
"""Main function.
:param opt_parser: User based option parser.
:type opt_parser: OptionParser
"""
optmgr = opt_parser()
opts = optmgr.parser.parse_args()
cls_name = opts.processor
try:
processor_cls = getattr(modules[__name__], cls_name)
except AttributeError:
print(f'Unsupported processor {cls_name}')
raise
processor = processor_cls()
processor.logger.setLevel(getattr(logging, opts.log_level))
log_handler = logging.StreamHandler()
log_handler.setFormatter(logging.Formatter(
'{name:20}: {message}', style='{'))
processor.logger.addHandler(log_handler)
processor.process(opts.data)
if __name__ == '__main__':
main()