#!/usr/bin/env python
#-*- coding: utf-8 -*-
"""Python server with thread pool and CHAP pipeline.
===========
Client side
===========
cat /tmp/chap.json
{"pipeline": [{"common.PrintProcessor": {}}], "input": 1}
==========================================
Curl call to the server with CHAP pipeline
==========================================
curl -X POST -H "Content-type: application/json" -d@/tmp/chap.json http://localhost:5000/pipeline
{"pipeline": [{"common.PrintProcessor":{}}], "status":"ok"}
===========
Server side
===========
flask --app server run
* Serving Flask app 'server'
* Debug mode: off
WARNING: This is a development server. Do not use it in a production
deployment. Use a production WSGI server instead.
* Running on http://127.0.0.1:5000
Press CTRL+C to quit
CHAP output::
CHAP.server : Call pipeline args=()
kwds={'pipeline': [{'common.PrintProcessor': {}}]}
CHAP.server : pipeline [{'common.PrintProcessor': {}}]
CHAP.server : Loaded
<CHAP.common.processor.PrintProcessor object at 0x10e0f1ed0>
CHAP.server : Loaded
<CHAP.pipeline.Pipeline object at 0x10e0f1f10> with 1 items
CHAP.server : Calling "execute" on <CHAP.pipeline.Pipeline
object at 0x10e0f1f10>
Pipeline : Executing "execute"
Pipeline : Calling "process" on
<CHAP.common.processor.PrintProcessor object at 0x10e0f1ed0>
PrintProcessor : Executing "process" with
type(data)=<class 'NoneType'>
PrintProcessor data : None
PrintProcessor : Finished "process" in 0.000 seconds
Pipeline : Executed "execute" in 0.000 seconds
"""
# System modules
import time
from queue import Queue
# Third-party modules
# Flask modules
from flask import Flask, request
# Local modules
from CHAP.taskmanager import (
TaskManager,
start_new_thread,
)
from CHAP.runner import (
run,
set_logger,
)
# Task manager to execute our tasks
taskManager = TaskManager()
# Flask Server
app = Flask(__name__)
# daemon task queue
task_queue = Queue()
[docs]
@app.route("/")
def index_route():
"""Server main end-point."""
return "CHAP daemon"
[docs]
@app.route("/run")
def run_route():
"""Server main end-point."""
ttask = request.args.get('task')
task_queue.put(ttask)
return f'Execute {ttask}'
[docs]
@app.route("/pipeline", methods=["POST"])
def pipeline_route():
"""Server /pipeline end-point."""
content = request.json
if 'pipeline' in content:
# spawn new pipeline task
jobs = []
jobs.append(taskManager.spawn(task, pipeline=content['pipeline']))
taskManager.joinall(jobs)
return {'status': 'ok', 'pipeline': content['pipeline']}
return {'status': 'fail', 'reason': 'no pipeline in incoming request'}
[docs]
def task(*args, **kwds):
"""Helper function to execute CHAP pipeline."""
log_level = 'INFO'
logger, log_handler = set_logger(log_level)
logger.info(f'call pipeline args={args} kwds={kwds}')
pipeline = kwds['pipeline']
logger.info(f'pipeline\n{pipeline}')
run(pipeline, logger, log_level, log_handler)
[docs]
def daemon(name, queue, interval):
"""Daemon example based on Queue."""
print(f'Daemon {name}')
while True:
if queue.qsize() == 0:
print('Default action')
time.sleep(interval)
else:
ttask = queue.get()
if ttask == 'exit':
return
print(f'daemon run {ttask}')
# start daemon thread in addition to Flask server
start_new_thread('daemon', daemon, ('daemon', task_queue, 3))