#!/usr/bin/env python
#-*- coding: utf-8 -*-
"""Module for Readers unique to the
`FOXDEN <https://github.com/CHESSComputing/FOXDEN>`__
integration with CHAP.
"""
# System modules
import json
from typing import Optional
# Third party modules
from pydantic import (
Field,
model_validator,
)
# Local modules
from CHAP.foxden.models import FoxdenRequestConfig
from CHAP.foxden.utils import HTTP_request
from CHAP.pipeline import PipelineItem
from CHAP.processor import Processor
[docs]
class FoxdenDataDiscoveryReader(PipelineItem):
"""Reader for the
`FOXDEN <https://github.com/CHESSComputing/FOXDEN>`__
Data Discovery service.
:ivar config: Initialization parameters for an instance of
:class:`~CHAP.foxden.models.FoxdenRequestConfig`.
:vartype config: dict, optional
"""
pipeline_fields: dict = Field(
default = {'config': 'foxden.models.FoxdenRequestConfig'},
init_var=True)
config: Optional[FoxdenRequestConfig] = FoxdenRequestConfig()
_validate_config = model_validator(mode='before')(
Processor.validate_processor_before)
[docs]
@model_validator(mode='after')
def validate_foxdendatadiscoveryreader_after(self):
"""Validate the `FoxdenDataDiscoveryReader` configuration.
:return: Validated model configuration
:rtype: FoxdenDataDiscoveryReader
"""
assert self.config.url is not None
return self
[docs]
def read(self):
"""Read records from the
`FOXDEN <https://github.com/CHESSComputing/FOXDEN>`__
Data Discovery service based on
DID (Dataset Identifier) or an arbitrary query.
:return: Discovered data records.
:rtype: list
"""
self.logger.debug(f'config: {self.config}')
# Submit HTTP request and return response
rurl = f'{self.config.url}/search'
payload = self.config.create_http_request_payload(self)
self.logger.info(f'method=POST url={rurl} payload={payload}')
response = HTTP_request(rurl, payload, method='POST', scope='read')
if self.config.verbose:
self.logger.info(
f'code={response.status_code} data={response.text}')
if response.status_code == 200:
result = json.loads(response.text)
self.logger.debug(f'Returning {len(result)} records')
return result
self.logger.warning(f'HTTP error code {response.status_code}')
self.logger.warning(
f'HTTP response:\n{response.__dict__}\npayload:\n{payload}')
return []
[docs]
class FoxdenProvenanceReader(PipelineItem):
"""Reader for the
`FOXDEN <https://github.com/CHESSComputing/FOXDEN>`__
Provenance service.
:ivar config: Initialization parameters for an instance of
:class:`~CHAP.foxden.models.FoxdenRequestConfig`.
:vartype config: dict, optional
"""
pipeline_fields: dict = Field(
default = {'config': 'foxden.models.FoxdenRequestConfig'},
init_var=True)
config: Optional[FoxdenRequestConfig] = FoxdenRequestConfig()
_validate_config = model_validator(mode='before')(
Processor.validate_processor_before)
[docs]
@model_validator(mode='after')
def validate_foxdenprovenancereader_after(self):
"""Validate the `FoxdenProvenanceReader` configuration.
:return: Validated model configuration
:rtype: FoxdenProvenanceReader
"""
if self.get_schema() is None:
self.schema_ = 'foxden.reader.FoxdenProvenanceReader'
assert self.config.url is not None
return self
[docs]
def read(self):
"""Read records from the
`FOXDEN <https://github.com/CHESSComputing/FOXDEN>`__
Provenance service based on did or an arbitrary query.
:return: Provenance input and output file records.
:rtype: list
"""
# FIX right now the provenance reader only returns input and
# output files, not the did. So you always also have to read a
# metadata record
self.logger.debug(f'config: {self.config}')
# Submit HTTP request and return response
rurl = f'{self.config.url}/files?did={self.config.did}'
payload = self.config.create_http_request_payload(self)
self.logger.info(f'method=GET url={rurl} payload={payload}')
response = HTTP_request(rurl, payload, method='GET', scope='read')
if self.config.verbose:
self.logger.info(
f'code={response.status_code} data={response.text}')
if response.status_code == 200:
result = [{'name': v['name'], 'file_type': v['file_type']}
for v in json.loads(response.text)]
self.logger.debug(f'Returning {len(result)} records')
return result
self.logger.warning(f'HTTP error code {response.status_code}')
self.logger.warning(
f'HTTP response:\n{response.__dict__}\npayload:\n{payload}')
return []
[docs]
class FoxdenSpecScansReader(PipelineItem):
"""Reader for `FOXDEN <https://github.com/CHESSComputing/FOXDEN>`__
SpecScans data from a specific FOXDEN SpecScans service.
"""
[docs]
def read(
self, url, data, *, did='', query='', spec=None, method='POST',
# 'GET',
verbose=False):
"""Read and return data from a specific
`FOXDEN <https://github.com/CHESSComputing/FOXDEN>`__
SpecScans service.
:param url: URL of service.
:type url: str
:param data: Input data.
:type data: list[PipelineData]
:param did: FOXDEN dataset identifier (DID).
:type did: string, optional
:param query: FOXDEN query.
:type query: string, optional
:param spec: FOXDEN spec.
:type spec: dictionary, optional
:param method: HTTP method to use, `'POST'` for creation or
`'PUT'` for update, defaults to `'POST'`.
:type method: str, optional
:param verbose: Verbose output flag, defaults to `False`.
:type verbose: bool, optional
:return: Contents of the input data.
:rtype: list
"""
# TODO FIX
self.logger.info(
f'Executing "process" with url={url} data={data} did={did}')
rurl = f'{url}/search'
request = {'client': 'CHAP-FoxdenSpecScansReader', 'service_query': {}}
if did:
request['service_query'].update({'spec': {'did': did}})
if query:
request['service_query'].update({'query': query})
if spec:
request['service_query'].update({'spec': spec})
payload = json.dumps(request)
if verbose:
self.logger.info(f'method={method} url={rurl} payload={payload}')
response = HTTP_request(rurl, payload, method=method)
if verbose:
self.logger.info(
f'code={response.status_code} data={response.text}')
if response.status_code == 200:
data = json.loads(response.text)
else:
data = []
return data
if __name__ == '__main__':
# Local modules
from CHAP.reader import main
main()