Source code for CHAP.runner

#!/usr/bin/env python
#-*- coding: utf-8 -*-
"""Main functions to execute a ChessAnalysisPipeline (CHAP)."""


[docs] def parser(): """Return an argument parser for the CHAP comment line interface (CLI). This parser accepts one required argument: the input CHAP configuration file name and several optional arguments. Execute: .. code-block:: bash $ CHAP --help from the command line for a description on how to use CHAP. """ # System modules from argparse import ArgumentParser pparser = ArgumentParser(prog='PROG') pparser.add_argument( 'config', action='store', default='', help='Input configuration file') pparser.add_argument( '-p', '--pipeline', nargs='*', help='Pipeline name(s)') pparser.add_argument( '--regex', nargs='?', default=False, const='match', choices=['match', 'search', 'fullmatch'], dest='regex_function', required=False, help='''Name of Python RegEx function (https://docs.python.org/3/howto/regex.html) to use for matching configured pipeline names against the string provided with the -p / --pipeline option.''' ) pparser.add_argument( '--batch', action='store_true', help='''Enables "batch mode" operation where every sub-pipeline is run in separate parallel processes. Log files for each pipeline process will be created in the directory specified with the `--batch-logdir` option.''' ) pparser.add_argument( '--batch-logdir', default='./CHAP_logs', dest='logdir', help='''Destination directory for individual pipeline log files when running multiple pipelines in batch mode.''' ) return pparser
[docs] def main(): """Main function.""" # System modules import re from yaml import safe_load # Local modules from CHAP.models import RunConfig try: # Third party modules # pylint: disable=c-extension-no-member from mpi4py import MPI have_mpi = True comm = MPI.COMM_WORLD except ImportError: have_mpi = False comm = None args = parser().parse_args() # Read the input config file configfile = args.config with open(configfile, encoding='utf-8') as file: config = safe_load(file) #RV Add to input_files in provenance data writer # Check if executed as a worker spawned by another Processor run_config = RunConfig(**config.pop('config'), comm=comm) if have_mpi and run_config.spawn: # pylint: disable=c-extension-no-member sub_comm = MPI.Comm.Get_parent() common_comm = sub_comm.Merge(True) # Read worker specific input config file if run_config.spawn > 0: with open( f'{configfile}_{common_comm.Get_rank()}', encoding='utf-8') as file: config = safe_load(file) run_config = RunConfig( **config.pop('config'), comm=common_comm) else: with open( f'{configfile}_{sub_comm.Get_rank()}', encoding='utf-8') as file: config = safe_load(file) run_config = RunConfig(**config.pop('config'), comm=comm) else: common_comm = comm # Get the pipeline configurations sub_pipelines = args.pipeline pipeline_config = [] batch_pipelines = [] if sub_pipelines is None: for name, sub_pipeline in config.items(): pipeline_config += sub_pipeline batch_pipelines.append((name, sub_pipeline)) else: for sub_pipeline in sub_pipelines: if sub_pipeline in config: pipeline_config += config.get(sub_pipeline) batch_pipelines.append( (sub_pipeline, config.get(sub_pipeline)) ) elif args.regex_function: match_func = getattr(re, args.regex_function) pipeline_matches = [ p for p in config if match_func(sub_pipeline, p) ] if pipeline_matches: pipeline_config += [ item for p in pipeline_matches for item in config.get(p) ] batch_pipelines += [ (p, config.get(p)) for p in pipeline_matches ] else: raise ValueError( f'No pipelines matching "{sub_pipeline}" found in the ' f'pipeline configuration ({list(config.keys())})' ) else: raise ValueError( f'Invalid pipeline option: \'{sub_pipeline}\' missing in ' f'the pipeline configuration ({list(config.keys())})' ) # Run the pipeline with or without profiling if run_config.profile: if args.batch: raise NotImplementedError( 'Cannot use --batch mode when profile is True.' ) # System modules from cProfile import runctx # python profiler from pstats import Stats # profiler statistics cmd = 'runner(run_config, pipeline_config, common_comm)' runctx(cmd, globals(), locals(), 'profile.dat') info = Stats('profile.dat') info.sort_stats('cumulative') info.print_stats() else: if args.batch: import multiprocessing as mp import os from time import time t0 = time() print(f'Running {len(batch_pipelines)} pipelines in batch mode...') os.makedirs(args.logdir, exist_ok=True) procs = [] for name, _pipeline in batch_pipelines: log_file = os.path.abspath( os.path.join(args.logdir, f'{name}.log') ) p = mp.Process( target=batch_runner, args=(run_config, _pipeline, log_file) ) p.start() procs.append(p) for p in procs: p.join() print(f'Done in {time()-t0:.3f} seconds.') else: runner(run_config, pipeline_config, common_comm) # Disconnect the spawned worker if have_mpi and run_config.spawn: common_comm.barrier() sub_comm.Disconnect()
[docs] def runner(run_config, pipeline_config, comm=None): """Main runner funtion. :param run_config: CHAP run configuration. :type run_config: RunConfig :param pipeline_config: CHAP Pipeline configuration. :type pipeline_config: dict :param comm: MPI communicator. :type comm: mpi4py.MPI.Comm, optional :return: Pipeline's returned data field. """ # System modules from time import time # Logging setup logger, log_handler = set_logger(run_config.log_level) logger.info(f'Input pipeline configuration: {pipeline_config}\n') # Run the pipeline t0 = time() data = run(run_config, pipeline_config, logger, log_handler, comm) logger.info(f'Executed "run" in {time()-t0:.3f} seconds') return data
[docs] def set_logger(log_level='INFO'): """Helper function to set the CHAP logger. :param log_level: Logger level, defaults to `"INFO"`. :type log_level: str :return: Logger and logging handler. :rtype: logging.Logger, logging.StreamHandler """ # System modules import logging logger = logging.getLogger(__name__) log_level = getattr(logging, log_level.upper()) logger.setLevel(log_level) log_handler = logging.StreamHandler() log_handler.setFormatter(logging.Formatter( '{asctime}: {name:20}: {levelname}: {message}', datefmt='%Y-%m-%d %H:%M:%S', style='{')) logger.addHandler(log_handler) return logger, log_handler
[docs] def run( run_config, pipeline_config, logger=None, log_handler=None, comm=None): """Run a given pipeline_config. :param run_config: CHAP run configuration. :type run_config: RunConfig :param pipeline_config: CHAP Pipeline configuration. :type pipeline_config: dict :param logger: Logger. :type logger: logging.Logger, optional :param log_handler: Logging handler. :type log_handler: logging.StreamHandler, optional :param comm: MPI communicator. :type comm: mpi4py.MPI.Comm, optional :return: `data` field of the first item in the returned list of pipeline items. """ # System modules from logging import getLogger import os from tempfile import NamedTemporaryFile # Local modules from CHAP.pipeline import Pipeline # Make sure os.makedirs is only called from the root node if comm is None: rank = 0 else: rank = comm.Get_rank() pipeline_args = [] pipeline_mmcs = [] #from time import time #t0 = time() for item in pipeline_config: # Load individual object with given name from its module config = run_config.model_dump() if isinstance(item, dict): name = list(item.keys())[0] item_args = item.get(name) if item_args is None: item_args = {} # Picking "inputdir" and "outputdir" from the item or from # the default run configuration, giving precedence to the # former if 'inputdir' in item_args: inputdir = item_args.pop('inputdir') if not os.path.isabs(inputdir): inputdir = os.path.normpath(os.path.realpath( os.path.join(run_config.inputdir, inputdir))) if not os.path.isdir(inputdir): raise OSError( f'input directory does not exist ({inputdir})') if not os.access(inputdir, os.R_OK): raise OSError('input directory is not accessible for ' f'reading ({inputdir})') config['inputdir'] = inputdir if 'outputdir' in item_args: outputdir = item_args.pop('outputdir') if not os.path.isabs(outputdir): outputdir = os.path.normpath(os.path.realpath( os.path.join(run_config.outputdir, outputdir))) if not rank: if not os.path.isdir(outputdir): os.makedirs(outputdir) try: NamedTemporaryFile(dir=outputdir) except Exception as exc: raise OSError( 'output directory is not accessible for ' f'writing ({outputdir})') from exc config['outputdir'] = outputdir else: name = item split_name = name.split('.') cls_name = split_name[-1] mod_name = '.'.join(split_name[:-1]) # Initialize the object's identifiers if 'users' in name: # Load users module. This is required in CHAPaaS which can # have common area for users module. Otherwise, we will be # required to have invidual user's PYTHONPATHs to load user # processors. try: # Third party modules # pylint: disable=unused-import import users except ImportError: if logger is not None: logger.error(f'Unable to load {name}') continue module_name = __import__(mod_name, fromlist=[cls_name]) else: module_name = __import__(f'CHAP.{mod_name}', fromlist=[cls_name]) try: module = getattr(module_name, cls_name) except: if ((cls_name.endswith('Processor') or cls_name == 'TomoCHESSMapConverter') and split_name[-2] != 'processor'): mod_name += '.processor' if cls_name.endswith('Reader') and split_name[-2] != 'reader': mod_name += '.reader' if cls_name.endswith('Writer') and split_name[-2] != 'writer': mod_name += '.writer' if 'users' in name: module_name = __import__(mod_name, fromlist=[cls_name]) else: module_name = __import__( f'CHAP.{mod_name}', fromlist=[cls_name]) module = getattr(module_name, cls_name) pipeline_mmcs.append(module) # Initialize the object's runtime arguments item_args['comm'] = comm #FIX make comm a field in RunConfig? if 'name' not in item_args: item_args['name'] = cls_name item_args.update(config) item_logger = getLogger(name) if log_handler is not None: item_logger.addHandler(log_handler) item_args['logger'] = item_logger if logger is not None: logger.info( f'Initialized input fields for an instance of {cls_name}') pipeline_args.append(item_args) #t1 = time() pipeline = Pipeline(mmcs=pipeline_mmcs, args=pipeline_args) #t2 = time() #print(f'\nInitialize pipeline done in {t1-t0:.3f} seconds.') #print(f'Instantiate Pipeline done in {t2-t1:.3f} seconds.') #exit('Done') pipeline.logger.setLevel(run_config.log_level) if log_handler is not None: pipeline.logger.addHandler(log_handler) if logger is not None: logger.info(f'Loaded {pipeline} with {len(pipeline_mmcs)} items\n') # Make sure os.makedirs completes before continuing all nodes if comm is not None: comm.barrier() # Execute the pipeline if logger is not None: logger.info(f'Calling "execute" on {pipeline}') result = pipeline.execute() if result: return result[0]['data'] return result
[docs] def batch_runner(run_config, pipeline_config, log_file): """Function for running a pipeline in batch mode with logging to file. Essentially a wrapper for the :meth:`~CHAP.runner.runner` function. :param run_config: CHAP run configuration. :type run_config: RunConfig :param pipeline_config: CHAP Pipeline configuration. :type pipeline_config: dict :param log file: Name of file for logging. :type log_file: str """ # System modules import sys import traceback print(f'Logging to {log_file}') with open(log_file, 'w', encoding='utf-8') as f: sys.stdout = f sys.stderr = f try: runner(run_config, pipeline_config, None) except Exception: traceback.print_exc()
if __name__ == '__main__': main()