#!/usr/bin/env python
#-*- coding: utf-8 -*-
"""Main functions to execute a ChessAnalysisPipeline (CHAP)."""
[docs]
def parser():
"""Return an argument parser for the CHAP comment line interface
(CLI). This parser accepts one required argument: the input CHAP
configuration file name and several optional arguments. Execute:
.. code-block:: bash
$ CHAP --help
from the command line for a description on how to use CHAP.
"""
# System modules
from argparse import ArgumentParser
pparser = ArgumentParser(prog='PROG')
pparser.add_argument(
'config', action='store', default='', help='Input configuration file')
pparser.add_argument(
'-p', '--pipeline', nargs='*', help='Pipeline name(s)')
pparser.add_argument(
'--regex', nargs='?', default=False, const='match',
choices=['match', 'search', 'fullmatch'],
dest='regex_function', required=False,
help='''Name of Python
RegEx function (https://docs.python.org/3/howto/regex.html)
to use for matching configured pipeline names against the
string provided with the -p / --pipeline option.'''
)
pparser.add_argument(
'--batch', action='store_true',
help='''Enables "batch mode" operation where every sub-pipeline
is run in separate parallel processes. Log files for each
pipeline process will be created in the directory specified
with the `--batch-logdir` option.'''
)
pparser.add_argument(
'--batch-logdir', default='./CHAP_logs', dest='logdir',
help='''Destination directory for individual pipeline log
files when running multiple pipelines in batch mode.'''
)
return pparser
[docs]
def main():
"""Main function."""
# System modules
import re
from yaml import safe_load
# Local modules
from CHAP.models import RunConfig
try:
# Third party modules
# pylint: disable=c-extension-no-member
from mpi4py import MPI
have_mpi = True
comm = MPI.COMM_WORLD
except ImportError:
have_mpi = False
comm = None
args = parser().parse_args()
# Read the input config file
configfile = args.config
with open(configfile, encoding='utf-8') as file:
config = safe_load(file)
#RV Add to input_files in provenance data writer
# Check if executed as a worker spawned by another Processor
run_config = RunConfig(**config.pop('config'), comm=comm)
if have_mpi and run_config.spawn:
# pylint: disable=c-extension-no-member
sub_comm = MPI.Comm.Get_parent()
common_comm = sub_comm.Merge(True)
# Read worker specific input config file
if run_config.spawn > 0:
with open(
f'{configfile}_{common_comm.Get_rank()}',
encoding='utf-8') as file:
config = safe_load(file)
run_config = RunConfig(
**config.pop('config'), comm=common_comm)
else:
with open(
f'{configfile}_{sub_comm.Get_rank()}',
encoding='utf-8') as file:
config = safe_load(file)
run_config = RunConfig(**config.pop('config'), comm=comm)
else:
common_comm = comm
# Get the pipeline configurations
sub_pipelines = args.pipeline
pipeline_config = []
batch_pipelines = []
if sub_pipelines is None:
for name, sub_pipeline in config.items():
pipeline_config += sub_pipeline
batch_pipelines.append((name, sub_pipeline))
else:
for sub_pipeline in sub_pipelines:
if sub_pipeline in config:
pipeline_config += config.get(sub_pipeline)
batch_pipelines.append(
(sub_pipeline, config.get(sub_pipeline))
)
elif args.regex_function:
match_func = getattr(re, args.regex_function)
pipeline_matches = [
p for p in config if match_func(sub_pipeline, p)
]
if pipeline_matches:
pipeline_config += [
item
for p in pipeline_matches
for item in config.get(p)
]
batch_pipelines += [
(p, config.get(p)) for p in pipeline_matches
]
else:
raise ValueError(
f'No pipelines matching "{sub_pipeline}" found in the '
f'pipeline configuration ({list(config.keys())})'
)
else:
raise ValueError(
f'Invalid pipeline option: \'{sub_pipeline}\' missing in '
f'the pipeline configuration ({list(config.keys())})'
)
# Run the pipeline with or without profiling
if run_config.profile:
if args.batch:
raise NotImplementedError(
'Cannot use --batch mode when profile is True.'
)
# System modules
from cProfile import runctx # python profiler
from pstats import Stats # profiler statistics
cmd = 'runner(run_config, pipeline_config, common_comm)'
runctx(cmd, globals(), locals(), 'profile.dat')
info = Stats('profile.dat')
info.sort_stats('cumulative')
info.print_stats()
else:
if args.batch:
import multiprocessing as mp
import os
from time import time
t0 = time()
print(f'Running {len(batch_pipelines)} pipelines in batch mode...')
os.makedirs(args.logdir, exist_ok=True)
procs = []
for name, _pipeline in batch_pipelines:
log_file = os.path.abspath(
os.path.join(args.logdir, f'{name}.log')
)
p = mp.Process(
target=batch_runner,
args=(run_config, _pipeline, log_file)
)
p.start()
procs.append(p)
for p in procs:
p.join()
print(f'Done in {time()-t0:.3f} seconds.')
else:
runner(run_config, pipeline_config, common_comm)
# Disconnect the spawned worker
if have_mpi and run_config.spawn:
common_comm.barrier()
sub_comm.Disconnect()
[docs]
def runner(run_config, pipeline_config, comm=None):
"""Main runner funtion.
:param run_config: CHAP run configuration.
:type run_config: RunConfig
:param pipeline_config: CHAP Pipeline configuration.
:type pipeline_config: dict
:param comm: MPI communicator.
:type comm: mpi4py.MPI.Comm, optional
:return: Pipeline's returned data field.
"""
# System modules
from time import time
# Logging setup
logger, log_handler = set_logger(run_config.log_level)
logger.info(f'Input pipeline configuration: {pipeline_config}\n')
# Run the pipeline
t0 = time()
data = run(run_config, pipeline_config, logger, log_handler, comm)
logger.info(f'Executed "run" in {time()-t0:.3f} seconds')
return data
[docs]
def set_logger(log_level='INFO'):
"""Helper function to set the CHAP logger.
:param log_level: Logger level, defaults to `"INFO"`.
:type log_level: str
:return: Logger and logging handler.
:rtype: logging.Logger, logging.StreamHandler
"""
# System modules
import logging
logger = logging.getLogger(__name__)
log_level = getattr(logging, log_level.upper())
logger.setLevel(log_level)
log_handler = logging.StreamHandler()
log_handler.setFormatter(logging.Formatter(
'{asctime}: {name:20}: {levelname}: {message}',
datefmt='%Y-%m-%d %H:%M:%S', style='{'))
logger.addHandler(log_handler)
return logger, log_handler
[docs]
def run(
run_config, pipeline_config, logger=None, log_handler=None, comm=None):
"""Run a given pipeline_config.
:param run_config: CHAP run configuration.
:type run_config: RunConfig
:param pipeline_config: CHAP Pipeline configuration.
:type pipeline_config: dict
:param logger: Logger.
:type logger: logging.Logger, optional
:param log_handler: Logging handler.
:type log_handler: logging.StreamHandler, optional
:param comm: MPI communicator.
:type comm: mpi4py.MPI.Comm, optional
:return: `data` field of the first item in the returned
list of pipeline items.
"""
# System modules
from logging import getLogger
import os
from tempfile import NamedTemporaryFile
# Local modules
from CHAP.pipeline import Pipeline
# Make sure os.makedirs is only called from the root node
if comm is None:
rank = 0
else:
rank = comm.Get_rank()
pipeline_args = []
pipeline_mmcs = []
#from time import time
#t0 = time()
for item in pipeline_config:
# Load individual object with given name from its module
config = run_config.model_dump()
if isinstance(item, dict):
name = list(item.keys())[0]
item_args = item.get(name)
if item_args is None:
item_args = {}
# Picking "inputdir" and "outputdir" from the item or from
# the default run configuration, giving precedence to the
# former
if 'inputdir' in item_args:
inputdir = item_args.pop('inputdir')
if not os.path.isabs(inputdir):
inputdir = os.path.normpath(os.path.realpath(
os.path.join(run_config.inputdir, inputdir)))
if not os.path.isdir(inputdir):
raise OSError(
f'input directory does not exist ({inputdir})')
if not os.access(inputdir, os.R_OK):
raise OSError('input directory is not accessible for '
f'reading ({inputdir})')
config['inputdir'] = inputdir
if 'outputdir' in item_args:
outputdir = item_args.pop('outputdir')
if not os.path.isabs(outputdir):
outputdir = os.path.normpath(os.path.realpath(
os.path.join(run_config.outputdir, outputdir)))
if not rank:
if not os.path.isdir(outputdir):
os.makedirs(outputdir)
try:
NamedTemporaryFile(dir=outputdir)
except Exception as exc:
raise OSError(
'output directory is not accessible for '
f'writing ({outputdir})') from exc
config['outputdir'] = outputdir
else:
name = item
split_name = name.split('.')
cls_name = split_name[-1]
mod_name = '.'.join(split_name[:-1])
# Initialize the object's identifiers
if 'users' in name:
# Load users module. This is required in CHAPaaS which can
# have common area for users module. Otherwise, we will be
# required to have invidual user's PYTHONPATHs to load user
# processors.
try:
# Third party modules
# pylint: disable=unused-import
import users
except ImportError:
if logger is not None:
logger.error(f'Unable to load {name}')
continue
module_name = __import__(mod_name, fromlist=[cls_name])
else:
module_name = __import__(f'CHAP.{mod_name}', fromlist=[cls_name])
try:
module = getattr(module_name, cls_name)
except:
if ((cls_name.endswith('Processor')
or cls_name == 'TomoCHESSMapConverter')
and split_name[-2] != 'processor'):
mod_name += '.processor'
if cls_name.endswith('Reader') and split_name[-2] != 'reader':
mod_name += '.reader'
if cls_name.endswith('Writer') and split_name[-2] != 'writer':
mod_name += '.writer'
if 'users' in name:
module_name = __import__(mod_name, fromlist=[cls_name])
else:
module_name = __import__(
f'CHAP.{mod_name}', fromlist=[cls_name])
module = getattr(module_name, cls_name)
pipeline_mmcs.append(module)
# Initialize the object's runtime arguments
item_args['comm'] = comm #FIX make comm a field in RunConfig?
if 'name' not in item_args:
item_args['name'] = cls_name
item_args.update(config)
item_logger = getLogger(name)
if log_handler is not None:
item_logger.addHandler(log_handler)
item_args['logger'] = item_logger
if logger is not None:
logger.info(
f'Initialized input fields for an instance of {cls_name}')
pipeline_args.append(item_args)
#t1 = time()
pipeline = Pipeline(mmcs=pipeline_mmcs, args=pipeline_args)
#t2 = time()
#print(f'\nInitialize pipeline done in {t1-t0:.3f} seconds.')
#print(f'Instantiate Pipeline done in {t2-t1:.3f} seconds.')
#exit('Done')
pipeline.logger.setLevel(run_config.log_level)
if log_handler is not None:
pipeline.logger.addHandler(log_handler)
if logger is not None:
logger.info(f'Loaded {pipeline} with {len(pipeline_mmcs)} items\n')
# Make sure os.makedirs completes before continuing all nodes
if comm is not None:
comm.barrier()
# Execute the pipeline
if logger is not None:
logger.info(f'Calling "execute" on {pipeline}')
result = pipeline.execute()
if result:
return result[0]['data']
return result
[docs]
def batch_runner(run_config, pipeline_config, log_file):
"""Function for running a pipeline in batch mode with logging to
file. Essentially a wrapper for the :meth:`~CHAP.runner.runner`
function.
:param run_config: CHAP run configuration.
:type run_config: RunConfig
:param pipeline_config: CHAP Pipeline configuration.
:type pipeline_config: dict
:param log file: Name of file for logging.
:type log_file: str
"""
# System modules
import sys
import traceback
print(f'Logging to {log_file}')
with open(log_file, 'w', encoding='utf-8') as f:
sys.stdout = f
sys.stderr = f
try:
runner(run_config, pipeline_config, None)
except Exception:
traceback.print_exc()
if __name__ == '__main__':
main()