from sanic import Sanic, response
from sanic.log import logger as logging
from ..handler import FileHandlerProcess
# import asyncio
import ujson
from .request import create_request, StatusRequest
from sanic_cors import CORS, cross_origin
import os
import sys
import asyncio
from tornado.platform.asyncio import BaseAsyncIOLoop, to_asyncio_future
from dask.distributed import Client
# import logging
import time
import traceback
app = Sanic(__name__)
CORS(app)
fileTime = 4
MAXWORKER = 10
# logger = logging.getLogger(__name__)
# logging.basicConfig(filename= os.getcwd() + 'efs.log',
# format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
# level=logging.DEBUG)
# for handler in logging.root.handlers[:]:
# logging.root.removeHandler(handler)
"""
The server module allows users to instantly create a REST API from the list of measuremensts.
The API can then be used to interactive exploration of data or build various applications.
"""
[docs]def setup_app(measurementsManager, dask_scheduler = None):
"""Setup the Sanic Rest API
Args:
measurementsManager: a measurements manager object
Returns:
a sanic app object
"""
print("This is a testpip")
global app
app.epivizMeasurementsManager = measurementsManager
app.epivizFileHandler = None
app.dask_scheduler = dask_scheduler
logging.info("Initialized Setup App")
# traceback.print_stack()
return app
[docs]def create_fileHandler():
"""create a dask file handler if one doesn't exist
"""
global app
app.epivizFileHandler = None
return app.epivizFileHandler
[docs]async def schedulePickle():
"""Sanic task to regularly pickle file objects from memory
"""
while True:
await asyncio.sleep(fileTime)
cleanQue = app.epivizFileHandler.cleanFileOBJ()
for stuff in cleanQue:
asyncio.ensure_future(stuff)
logging.info("Updating cache, pickle file objects")
[docs]@app.listener('before_server_start')
async def setup_connection(app, loop):
"""Sanic callback for app setup before the server starts
"""
# app.epivizFileHandler = FileHandlerProcess(fileTime, MAXWORKER)
# for rec in app.epivizMeasurementsManager.get_measurements():
# if rec.datasource == "files" or rec.datasource == "computed":
# rec.fileHandler = app.epivizFileHandler
logging.info('Server successfully started!')
# also create a cache folder
if not os.path.exists(os.getcwd() + "/cache"):
os.mkdir('cache')
[docs]@app.listener('after_server_start')
async def setup_after_connection(app, loop):
logging.info("after server start")
# configure tornado use asyncio's loop
ioloop = BaseAsyncIOLoop(loop)
logging.info("after ioloop")
# ioloop = asyncio.get_running_loop()
# init distributed client
# cluster = LocalCluster(asynchronous=True, scheduler_port=8786, nanny=False, n_workers=2,
# threads_per_worker=1)
logging.info("setting up dask client with scheduler {}".format(app.dask_scheduler))
if app.dask_scheduler is None:
app.client = await Client(asynchronous=True, nanny=False, loop=ioloop)
else:
app.client = await Client(address = app.dask_scheduler, asynchronous=True)
print(app.client)
logging.info("setup client")
app.epivizFileHandler = FileHandlerProcess(fileTime, MAXWORKER)
app.epivizFileHandler.client = app.client
for rec in app.epivizMeasurementsManager.get_measurements():
if rec.datasource == "files" or rec.datasource == "computed":
rec.fileHandler = app.epivizFileHandler
logging.info("FileHandler created")
logging.info("starting client")
# await to_asyncio_future(app.client._start())
[docs]@app.listener('before_server_stop')
async def clean_up(app, loop):
folder = os.getcwd() + "/cache/"
file_path = None
logging.info("cache cleaned")
for the_file in os.listdir(folder):
file_path = os.path.join(folder, the_file)
try:
if os.path.isfile(file_path):
os.unlink(file_path)
#elif os.path.isdir(file_path): shutil.rmtree(file_path)
# await to_asyncio_future(app.client._shutdown())
await app.client.close()
except Exception as e:
print(e)
# await to_asyncio_future(app.client._shutdown())
# async def clean_tasks(app, loop):
# for task in asyncio.Task.all_tasks():
# task.cancel()
# app.add_task(schedulePickle())
@app.route("/", methods=["POST", "OPTIONS", "GET"])
async def process_request(request):
"""
Process am API request
Args:
request: a sanic request object
Returns:
a JSON result
"""
param_action = request.args.get("action")
param_id = request.args.get("requestId")
version = request.args.get("version")
logging.debug("Request received: %s" %(request.args))
start = time.time()
epiviz_request = create_request(param_action, request.args)
result, error = await epiviz_request.get_data(request.app.epivizMeasurementsManager)
logging.debug("Request total time: %s" %(time.time() - start))
logging.debug("Request processed: %s" %(param_id))
return response.json({"requestId": int(param_id),
"type": "response",
"error": error,
"data": result,
"version": 5
},
status=200)
@app.route("/addData", methods=["POST", "OPTIONS", "GET"])
async def process_request(request):
"""
API Endpoint to add new datasets to an instance
API Params:
file: location of the json or hub file
filetype: 'hub' if trackhub or 'json' if configuration file
Args:
request: a sanic request object
Returns:
success/fail after adding measurements
"""
file = request.args.get("file")
type = request.args.get("filetype")
if type is "json":
request.app.epivizMeasurementsManager.import_files(file, request.app.epivizFileHandler)
elif type is "hub":
request.app.epivizMeasurementsManager.import_trackhub(file, request.app.epivizFileHandler)
return response.json({"requestId": int(param_id),
"type": "response",
"error": None,
"data": True,
"version": 5
},
status=200)
@app.route("/status", methods=["GET"])
async def process_request(request):
# response = {
# "requestId": -1,
# "type": "response",
# "error": None,
# "version": 5,
# "data": {
# "message": "EFS up",
# "stats": request.app.epivizMeasurementsManager.stats
# }
# }
return response.json({
"requestId": -1,
"type": "response",
"error": None,
"version": 5,
"data": {
"message": "EFS up",
"stats": request.app.epivizMeasurementsManager.stats
}
}, status=200)
@app.route("/status/<datasource>", methods=["GET"])
async def process_request(request, datasource):
epiviz_request = StatusRequest(request, datasource)
result, error = await epiviz_request.get_status(request.app.epivizMeasurementsManager)
result = "ok: {} bytes read".format(result) if result > 0 else "fail"
res = {
"requestId": -1,
"type": "response",
"error": error,
"version": 5,
"data": {
"message": "check status of datasource " + datasource + ": " + result
}
}
if datasource in request.app.epivizMeasurementsManager.stats["getRows"]:
data = request.app.epivizMeasurementsManager.stats["getRows"][datasource]
res["data"]["getRows"] = data
mean = data["sum"] / data["count"]
res["data"]["SD"] = (data["sumSquares"] + (data["count"] * (mean ** 2)) - 2 * mean * data["sum"])/data["count"]
if datasource in request.app.epivizMeasurementsManager.stats["getValues"]:
data = request.app.epivizMeasurementsManager.stats["getValues"][datasource]
res["data"]["getValues"] = data
mean = data["sum"] / data["count"]
res["data"]["SD"] = (data["sumSquares"] + (data["count"] * (mean ** 2)) - 2 * mean * data["sum"])/data["count"]
if datasource in request.app.epivizMeasurementsManager.stats["search"]:
data = request.app.epivizMeasurementsManager.stats["search"][datasource]
res["data"]["search"] = data
mean = data["sum"] / data["count"]
res["data"]["SD"] = (data["sumSquares"] + (data["count"] * (mean ** 2)) - 2 * mean * data["sum"])/data["count"]
return response.json(res, status=200)