CHAP package

Base PipelineItems used in running a ChessAnalysisPipeline (CHAP).

CHAP provides infrastructure to construct and run X-ray data processing and analysis workflows using a set of modular components. We call these components PipelineItems (subclassed into Readers, Processors, and Writers). A pipeline uses a sequence of PipelineItems to execute a data processing workflow where the data returned by one PipelineItem becomes input for the following ones.

Many PipelineItems can be shared by data processing workflows for multiple different X-ray techniques, while others may be unique to just a single technique. The PipelineItems that are shared by many techniques are organized in the CHAP.common subpackage. PipelineItems unique to a tomography workflow, for instance, are organized in the CHAP.tomo subpackage.

utils contains a broad selection of utilities to assist in some common tasks that appear in specific `PipelineItem implementations.

Submodules summary

models

Common Pydantic model classes.

pipeline

Base pipeline Pydantic model classes.

processor

Module defining the base Processor class to derive all others from.

reader

Module defining the base Reader class to derive all others from.

runner

Main functions to execute a ChessAnalysisPipeline (CHAP).

server

Python server with thread pool and CHAP pipeline.

taskmanager

Python thread pool.

writer

Module defining the base Writer class to derive all others from.

Subpackages

Submodules

CHAP.models module

Common Pydantic model classes.

class CHAPBaseModel[source]

Bases: BaseModel

Base CHAP configuration class implementing robust serialization tools.

dict(*args, **kwargs)[source]

Dump the class implemention to a dictionary.

Parameters:
  • **kwargs – Arbitrary keyword arguments.

  • exclude (dict or set, optional) – Class variable(s) to omit from the output dictionary.

  • by_alias (bool, optional) – Use aliases as the output dictionary keys for class variables that have an alias., defaults to True.

Type:

dict

Returns:

Class implementation.

Return type:

dict

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_dump(*args, **kwargs)[source]

Dump the class implemention to a dictionary.

Parameters:
  • **kwargs – Arbitrary keyword arguments.

  • exclude (dict or set, optional) – Class variable(s) to omit from the output dictionary.

  • by_alias (bool, optional) – Use aliases as the output dictionary keys for class variables that have an alias., defaults to True.

Type:

dict

Returns:

Class implementation.

Return type:

dict

model_dump_json(*args, **kwargs)[source]

Dump the class implemention to a JSON string.

Parameters:
  • **kwargs – Arbitrary keyword arguments.

  • exclude (dict or set, optional) – Class variable(s) to omit from the output dictionary.

  • by_alias (bool, optional) – Use aliases as the output dictionary keys for class variables that have an alias., defaults to True.

Type:

dict

Returns:

Class implementation.

Return type:

str

class RunConfig(*, root: Annotated[Path, PathType(path_type=dir)] | None = '/home/runner/work/ChessAnalysisPipeline/ChessAnalysisPipeline/docs', inputdir: Annotated[Path, PathType(path_type=dir)] | None = None, outputdir: Annotated[Path, PathType(path_type=dir)] | None = None, interactive: bool | None = False, log_level: Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] | None = 'INFO')[source]

Bases: CHAPBaseModel

Pipeline run configuration class.

Variables:
  • root (str, optional) – Default work directory, defaults to the current run directory.

  • inputdir (str, optional) – Input directory, used only if any input file in the pipeline is not an absolute path, defaults to ‘root’.

  • outputdir (str, optional) – Output directory, used only if any output file in the pipeline is not an absolute path, defaults to ‘root’.

  • interactive (bool, optional) – Allows for user interactions, defaults to False.

  • log_level (Literal[ 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], optional) – Logger level (not case sensitive), defaults to ‘INFO’.

inputdir: Annotated[Path, PathType(path_type=dir)] | None
interactive: bool | None
log_level: Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] | None
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_post_init(context: Any, /) None

This function is meant to behave like a BaseModel method to initialize private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Args:

self: The BaseModel instance. context: The context.

outputdir: Annotated[Path, PathType(path_type=dir)] | None
property profile

Return the profiling flag.

Type:

bool

root: Annotated[Path, PathType(path_type=dir)] | None
property spawn

Return the spawned worker flag.

Type:

int

classmethod validate_log_level(log_level)[source]

Capitalize log_level.

param value: Input value for log_level. :type value: str :return: Capitalized log_level. :rtype: str

classmethod validate_runconfig_before(data)[source]

Ensure that valid directory paths are provided.

Parameters:

data (dict) – Pydantic validator data object.

Returns:

Currently validated class attributes.

Return type:

dict

CHAP.pipeline module

Base pipeline Pydantic model classes.

class Pipeline[source]

Bases: CHAPBaseModel

Class representing a full Pipeline object.

Variables:
  • args (list[dict]) – List of PipelineItem arguments for each item in the full pipeline.

  • logger (logging.Logger, optional) – CHAP logger.

  • mmcs (list[pydantic._internal._model_construction.ModelMetaclass]) – List of PipelineItems classes in the full pipeline.

args: Annotated[list[dict], Len(min_length=1, max_length=None)]
execute()[source]

Executes the pipeline.

Returns:

List of PipelineData items after pipeline execution.

Return type:

list[PipelineData]

logger: Logger | None
mmcs: Annotated[list[ModelMetaclass], Len(min_length=1, max_length=None)]
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_post_init(context: Any, /) None

This function is meant to behave like a BaseModel method to initialize private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Args:

self: The BaseModel instance. context: The context.

validate_pipeline_after()[source]

Validate the Pipeline configuration and initialize and validate the private attributes.

Returns:

Validated configuration.

Return type:

Pipeline

class PipelineData(name=None, data=None, schema=None)[source]

Bases: dict

Wrapper for all results of PipelineItem.execute.

class PipelineItem(*, root: Annotated[Path, PathType(path_type=dir)] | None = '/home/runner/work/ChessAnalysisPipeline/ChessAnalysisPipeline/docs', inputdir: Annotated[Path, PathType(path_type=dir)] | None = None, outputdir: Annotated[Path, PathType(path_type=dir)] | None = None, interactive: bool | None = False, log_level: Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] | None = 'INFO')[source]

Bases: RunConfig

Class representing a single item in a Pipeline object.

Variables:
  • logger (logging.Logger, optional) – CHAP logger.

  • name (str, optional) – Pipeline object name.

  • schema (str, optional) – Pipeline object schema.

execute(data)[source]

Execute the appropriate method of the object and return the result.

Parameters:

data (list[PipelineData]) – Input data.

Returns:

Wrapped result of executing read, process, or write.

Return type:

PipelineData | tuple[PipelineData]

get_args()[source]

Return the PipelineItems execution method run time arguments.

Type:

dict

get_config(data=None, config=None, schema=None, remove=True)[source]

Look through data for the last item which value for the ‘schema’ key matches schema. Convert the value for that item’s ‘data’ key into the configuration’s Pydantic model identified by schema and return it. If no item is found and config and schema are specified, validate config against the configuration’s Pydantic model identified by schema and return it. Return config if no item is found and config is specified, but schema is not.

Parameters:
  • data (list[PipelineData], optional) – Input data.

  • config (dict, optional) – Initialization parameters for an instance of the Pydantic model identified by schema, required if data is unspecified, invalid or does not contain an item that matches the schema, superseeds any equal parameters contained in data.

  • schema (str, optional) – Schema of the PipelineItem class to match in data, defaults to the internal PipelineItem schema attribute.

  • remove (bool, optional) – If there is a matching entry in data, remove it from the list, defaults to True.

Raises:

ValueError – If there’s no match for schema in data.

Returns:

Last matching validated configuration model.

Return type:

PipelineItem

static get_data(data, name=None, schema=None, remove=True)[source]

Look through data for the last item which ‘data’ value is a NeXus style NXobject object or matches a given name or schema. Pick the last item for which the ‘name’ key matches name if set or the ‘schema’ key matches schema if set, pick the last match for a NXobjecta object otherwise. Return the data object.

Parameters:
  • data (list[PipelineData].) – Input data.

  • name (str, optional) – Name of the PipelineItem class to match in data.

  • schema (str | list[str], optional) – Schema of the PipelineItem class to match in data & return.

  • remove (bool, optional) – If there is a matching entry in data, remove it from the list, defaults to True.

Raises:

ValueError – If there’s no match for name or ‘schema` in data, or if there is no object of type nexusformat.nexus.NXobject.

Returns:

Last matching data item.

Return type:

Any

static get_default_nxentry(nxobject)[source]

Given a NeXus style NXroot object or a NeXus style NXentry object, return the default or first NXentry match.

Parameters:

nxobject (nexusformat.nexus.NXroot | nexusformat.nexus.NXentry) – Input data.

Raises:

ValueError – If unable to retrieve a NXentry object.

Returns:

Input data if a NXentry object or the default or first NXentry object if a NXroot object.

Return type:

nexusformat.nexus.NXentry

static get_nxroot(nxobject)[source]

Given a NeXus style NXroot object or a NeXus style NXentry object, return a NXroot object with the appropriate default path to the NXentry object set.

Parameters:

nxobject (nexusformat.nexus.NXroot | nexusformat.nexus.NXentry) – Input data.

Raises:

ValueError – If unable to retrieve a NXroot or NXentry object.

Returns:

Input data if a NXroot object or a NXroot object with the input as its default NXentry object.

Returns:

NXroot object.

Return type:

nexusformat.nexus.NXroot

static get_pipelinedata_item(data, index=-1, remove=False)[source]

If ‘data’ is a list, then retrieve from data the list item matching index and return it’s data value, otherwise return data itself.

Parameters:
  • data (Any | list[PipelineData]) – Input data.

  • index (int, optional) – List index of the item to retrieve from data, default to -1 or the last item in the list.

  • remove (bool, optional) – If there is a matching entry in data, remove it from the list, defaults to False.

Returns:

Matching data item.

Return type:

Any

get_schema()[source]

Return the PipelineItems schema.

Type:

str

has_filename()[source]

Does the PipelineItem has a filename class attribute?

Returns:

True if the PipelineItem has a filename class attribute.

Return type:

bool

logger: Logger | None
property method

Return the PipelineItems read, process or write method.

Type:

types.MethodType

property method_type

Return the PipelineItems execute method type.

Type:

Literal[‘read’, ‘process’, ‘write’]

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_post_init(context: Any, /) None

This function is meant to behave like a BaseModel method to initialize private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Args:

self: The BaseModel instance. context: The context.

name: Annotated[str, StringConstraints(strip_whitespace=True, to_upper=None, to_lower=None, strict=None, min_length=1, max_length=None, pattern=None, ascii_only=None)] | None
property run_config

Return the PipelineItems run configuration.

Type:

RunConfig

schema_: Annotated[str, StringConstraints(strip_whitespace=True, to_upper=None, to_lower=None, strict=None, min_length=1, max_length=None, pattern=None, ascii_only=None)] | None
set_args(**args)[source]

Set the PipelineItems execution method run time arguments that are allowed by its method declaration.

Param:

PipelineItems execution method run time arguments.

Type:

dict

property status

Return the PipelineItems status.

Type:

Literal[‘read’, ‘write_pending’, ‘written’]

static unwrap_pipelinedata(data)[source]

Given a list of PipelineData objects, return a list of their data values.

Parameters:

data (list[PipelineData]) – Input data to read, write, or process that needs to be unwrapped from PipelineData before use.

Returns:

‘data’ values of the items in the input data.

Return type:

list

validate_pipelineitem_after()[source]

Validate the PipelineItem configuration.

Returns:

Validated configuration.

Return type:

PipelineItem

CHAP.processor module

Module defining the base Processor class to derive all others from.

class OptionParser[source]

Bases: object

User based option parser.

class Processor(*, root: Annotated[Path, PathType(path_type=dir)] | None = '/home/runner/work/ChessAnalysisPipeline/ChessAnalysisPipeline/docs', inputdir: Annotated[Path, PathType(path_type=dir)] | None = None, outputdir: Annotated[Path, PathType(path_type=dir)] | None = None, interactive: bool | None = False, log_level: Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] | None = 'INFO')[source]

Bases: PipelineItem

Base processor.

The job of any Processor in a pipeline is to receive data returned by a previous PipelineItem, process it in some way, and return the result for the following PipelineItems to use.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_post_init(context: Any, /) None

This function is meant to behave like a BaseModel method to initialize private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Args:

self: The BaseModel instance. context: The context.

process(data)[source]

Extract the contents of the input data, add a string to it, and return the amended value.

Parameters:

data (list[PipelineData]) – Input data.

Returns:

Processed data.

Return type:

str

classmethod validate_processor_before(data)[source]

Validate the Processor class attributes.

Parameters:

data (dict) – Pydantic validator data object.

Returns:

Currently validated class attributes.

Return type:

dict

main(opt_parser=<class 'CHAP.processor.OptionParser'>)[source]

Main function.

Parameters:

opt_parser (OptionParser) – User based option parser.

CHAP.reader module

Module defining the base Reader class to derive all others from.

class Reader(*, root: Annotated[Path, PathType(path_type=dir)] | None = '/home/runner/work/ChessAnalysisPipeline/ChessAnalysisPipeline/docs', inputdir: Annotated[Path, PathType(path_type=dir)] | None = None, outputdir: Annotated[Path, PathType(path_type=dir)] | None = None, interactive: bool | None = False, log_level: Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] | None = 'INFO')[source]

Bases: PipelineItem

Base reader.

The job of any Reader in a pipeline is to provide data stored in a file to the list of PipelineItems.

Variables:

filename (str) – Name of file to read from.

filename: Annotated[str, StringConstraints(strip_whitespace=True, to_upper=None, to_lower=None, strict=None, min_length=1, max_length=None, pattern=None, ascii_only=None)]
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_post_init(context: Any, /) None

This function is meant to behave like a BaseModel method to initialize private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Args:

self: The BaseModel instance. context: The context.

read()[source]

Read and return the contents of filename as text.

Returns:

File content.

Return type:

str

main(opt_parser=<class 'CHAP.reader._OptionParser'>)[source]

Main function.

Parameters:

opt_parser (CHAP.reader._OptionParser) – User based option parser.

validate_reader_model(reader)[source]

Validate the reader configuration.

Returns:

Validated model.

Return type:

Any

CHAP.runner module

Main functions to execute a ChessAnalysisPipeline (CHAP).

batch_runner(run_config, pipeline_config, log_file)[source]

Function for running a pipeline in batch mode with logging to file. Essentially a wrapper for the runner() function.

Parameters:
  • run_config (RunConfig) – CHAP run configuration.

  • pipeline_config (dict) – CHAP Pipeline configuration.

  • file (log) – Name of file for logging.

main()[source]

Main function.

parser()[source]

Return an argument parser for the CHAP comment line interface (CLI). This parser accepts one required argument: the input CHAP configuration file name and several optional arguments. Execute:

$ CHAP --help

from the command line for a description on how to use CHAP.

run(run_config, pipeline_config, logger=None, log_handler=None, comm=None)[source]

Run a given pipeline_config.

Parameters:
  • run_config (RunConfig) – CHAP run configuration.

  • pipeline_config (dict) – CHAP Pipeline configuration.

  • logger (logging.Logger, optional) – Logger.

  • log_handler (logging.StreamHandler, optional) – Logging handler.

  • comm (mpi4py.MPI.Comm, optional) – MPI communicator.

Returns:

data field of the first item in the returned list of pipeline items.

runner(run_config, pipeline_config, comm=None)[source]

Main runner funtion.

Parameters:
  • run_config (RunConfig) – CHAP run configuration.

  • pipeline_config (dict) – CHAP Pipeline configuration.

  • comm (mpi4py.MPI.Comm, optional) – MPI communicator.

Returns:

Pipeline’s returned data field.

set_logger(log_level='INFO')[source]

Helper function to set the CHAP logger.

Parameters:

log_level (str) – Logger level, defaults to “INFO”.

Returns:

Logger and logging handler.

Return type:

logging.Logger, logging.StreamHandler

CHAP.server module

Python server with thread pool and CHAP pipeline.

Client side

cat /tmp/chap.json {“pipeline”: [{“common.PrintProcessor”: {}}], “input”: 1}

Curl call to the server with CHAP pipeline

curl -X POST -H “Content-type: application/json” -d@/tmp/chap.json http://localhost:5000/pipeline {“pipeline”: [{“common.PrintProcessor”:{}}], “status”:”ok”}

Server side

flask –app server run

  • Serving Flask app ‘server’

  • Debug mode: off

WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.

Press CTRL+C to quit

CHAP output:

CHAP.server         : Call pipeline args=()
    kwds={'pipeline': [{'common.PrintProcessor': {}}]}
CHAP.server         : pipeline [{'common.PrintProcessor': {}}]
CHAP.server         : Loaded
    <CHAP.common.processor.PrintProcessor object at 0x10e0f1ed0>
CHAP.server         : Loaded
    <CHAP.pipeline.Pipeline object at 0x10e0f1f10> with 1 items
CHAP.server         : Calling "execute" on <CHAP.pipeline.Pipeline
    object at 0x10e0f1f10>
Pipeline            : Executing "execute"
Pipeline            : Calling "process" on
    <CHAP.common.processor.PrintProcessor object at 0x10e0f1ed0>
PrintProcessor      : Executing "process" with
    type(data)=<class 'NoneType'>
PrintProcessor data : None
PrintProcessor      : Finished "process" in 0.000 seconds
Pipeline            : Executed "execute" in 0.000 seconds
daemon(name, queue, interval)[source]

Daemon example based on Queue.

index_route()[source]

Server main end-point.

pipeline_route()[source]

Server /pipeline end-point.

run_route()[source]

Server main end-point.

task(*args, **kwds)[source]

Helper function to execute CHAP pipeline.

CHAP.taskmanager module

Python thread pool, see http://code.activestate.com/recipes/577187-python-thread-pool

class StoppableThread(target, name, args)[source]

Bases: Thread

Thread class with a stop() method. The thread itself has to check regularly for the stopped() condition.

running()[source]

Return running status of the thread.

stop()[source]

Set event to stop the thread.

stopped()[source]

Return stopped status of the thread.

class TaskManager(nworkers=10, name='TaskManager')[source]

Bases: object

Task manager class based on thread module which executes assigned tasks concurently. It uses a pool of thread workers, queue of tasks and pid set to monitor jobs execution.

Use case:
mgr  = TaskManager()
jobs = []
jobs.append(mgr.spawn(func, args))
mgr.joinall(jobs)
clear(tasks)[source]

Clear all tasks in a queue. It allows current jobs to run, but will block all new requests till workers event flag is set again.

is_alive(pid)[source]

Check worker queue if given pid of the process is still running.

joinall(tasks)[source]

Join all tasks in a queue and quit.

nworkers()[source]

Return number of workers associated with this manager.

quit()[source]

Put None task to all workers and let them quit.

remove(pid)[source]

Remove pid and associative process from the queue.

spawn(func, *args, **kwargs)[source]

Spawn new process for given function.

status()[source]

Return status of task manager queue.

class UidSet[source]

Bases: object

UID holder keeps track of uid frequency.

add(uid)[source]

Add given uid or increment uid occurence in a set.

discard(uid)[source]

Either discard or downgrade uid occurence in a set.

get(uid)[source]

Get value for given uid.

class Worker(name, taskq, pidq, uidq, *, logger=None)[source]

Bases: Thread

Thread executing worker from a given tasks queue.

force_exit()[source]

Force run loop to exit in a hard way.

run()[source]

Run thread loop.

genkey(query)[source]

Generate a new key-hash for a given query. CHAP uses a md5 hash for the query and the key is just the hex representation of this hash.

set_thread_name(ident, name)[source]

Set the thread name for given identified.

start_new_thread(name, func, args, unique=False)[source]

Wrapper for standard thread.strart_new_thread call.

CHAP.writer module

Module defining the base Writer class to derive all others from.

class Writer(*, root: Annotated[Path, PathType(path_type=dir)] | None = '/home/runner/work/ChessAnalysisPipeline/ChessAnalysisPipeline/docs', inputdir: Annotated[Path, PathType(path_type=dir)] | None = None, outputdir: Annotated[Path, PathType(path_type=dir)] | None = None, interactive: bool | None = False, log_level: Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] | None = 'INFO')[source]

Bases: PipelineItem

Base writer.

The job of any Writer in a pipeline is to receive input returned as a previous PipelineItem and write its data to file in a particular file format.

Variables:
  • filename (str) – Name of file to write to.

  • force_overwrite (bool, optional) – Flag to allow data in filename to be overwritten if it already exists, defaults to False.

  • remove (bool, optional) – Flag to remove the dictionary from data, defaults to False.

filename: str
force_overwrite: bool | None
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_post_init(context: Any, /) None

This function is meant to behave like a BaseModel method to initialize private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Args:

self: The BaseModel instance. context: The context.

remove: bool | None
write(data)[source]

Write the last PipelineData item in data as text to a file.

Parameters:

data (list[PipelineData]) – Input data.

main(opt_parser=<class 'CHAP.writer._OptionParser'>)[source]

Main function.

Parameters:

opt_parser (CHAP.writer._OptionParser) – User based option parser.

validate_writer_model(writer)[source]

Validate the writer configuration.

Returns:

Validated model.

Return type:

Any