Source code for epivizfileserver.handler.HandlerNoActor

# from multiprocessing import Process, Manager, Lock
import pickle
# import threading
from .utils import create_parser_object
import os
from datetime import datetime, timedelta
import ujson
import pandas as pd
from aiocache import cached, Cache
from aiocache.serializers import JsonSerializer, PickleSerializer
# import logging
# import dask
from dask.distributed import Client

# dask.config.set({
#     'distributed.admin.tick.interval': '1000ms',
#     'distributed.worker.profile.interval': '1000ms'
# })

# logger = logging.getLogger(__name__)
from sanic.log import logger as logging

[docs]class FileHandlerProcess(object): """ Class to manage query, transformation and cache using dask distributed Args: fileTime (int): time to keep file objects in memory MAXWORKER (int): maximum workers that can be used Attributes: records: a dictionary of all file objects client: asynchronous dask server client """ def __init__(self, fileTime, MAXWORKER, client = None): self.records = {} self.client = client self.fileTime = fileTime self.IDcount = 0
[docs] def setRecord(self, name, fileObj, fileType): """add or update `records` with new file object Args: name (str): file name fileObj: file object fileType: file type """ self.records[name] = {"fileObj":fileObj, "time": datetime.now(), "pickled": False, "pickling": False, "ID": self.IDcount, "fileType": fileType} self.IDcount += 1
[docs] async def getRecord(self, name): """get file object from `records` by name Args: name (str): file name Returns: file object """ record = self.records.get(name) while record["pickling"]: pass if record["pickled"]: record["pickling"] = True record["pickled"] = False filehandler = open(os.getcwd() + "/cache/"+ str(record["ID"]) + ".cache", "rb") cache = pickle.load(filehandler) fileType = record.get("fileType") fileClass = create_parser_object(fileType, name) fileObj = fileClass(name) record["fileObj"] = fileObj await fileObj.set_cache(cache) record["pickling"] = False filehandler.close() os.remove(os.getcwd() + "/cache/"+ str(record["ID"]) + ".cache") record["time"] = datetime.now() return record.get("fileObj")
[docs] def cleanFileOBJ(self): """automated task to pickle all fileobjects to disk """ logging.debug("Handler: %s" %("cleanFileObj")) tasks = [] for fileName, record in self.records.items(): if datetime.now() - record.get("time") > timedelta(seconds = self.fileTime) and not record.get("pickled"): tasks.append(self.pickleFileObject(fileName)) return tasks
[docs] async def pickleFileObject(self, fileName): """automated task to load a pickled file object Args: fileName: file name to load """ logging.debug("Handler: %s\t%s" %(fileName, "pickleFileObject")) record = self.records.get(fileName) record["pickling"] = True record["pickled"] = True filehandler = open(os.getcwd() + "/cache/"+ str(record["ID"]) + ".cache", "wb") cache = await record["fileObj"].get_cache() pickle.dump(cache, filehandler) filehandler.close() record["pickling"] = False record["fileObj"] = None
[docs] @cached(ttl=None, cache=Cache.MEMORY, serializer=PickleSerializer(), namespace="handlefile") async def handleFile(self, fileName, fileType, chr, start, end, bins = 2000): """submit tasks to the dask client Args: fileName: file location fileType: file type chr: chromosome start: genomic start end: genomic end points: number of base-pairse to group per bin """ logging.debug("Handler: %s\t%s" %(fileName, "handleFile")) if self.records.get(fileName) == None: fileClass = create_parser_object(fileType, fileName) fileObj = fileClass(fileName) self.setRecord(fileName, fileObj, fileType) fileObj = await self.getRecord(fileName) fileFuture = self.client.submit(fileObj.getRange, chr, start, end, bins) data, err = await self.client.gather(fileFuture) return data, err
[docs] @cached(ttl=None, cache=Cache.MEMORY, serializer=PickleSerializer(), namespace="handlesearch") async def handleSearch(self, fileName, fileType, query, maxResults): """submit tasks to the dask client Args: fileName: file location fileType: file type chr: chromosome start: genomic start end: genomic end """ logging.debug("Handler: %s\t%s" %(fileName, "handleSearch")) if self.records.get(fileName) == None: fileClass = create_parser_object(fileType, fileName) fileObj = fileClass(fileName) self.setRecord(fileName, fileObj, fileType) fileObj = await self.getRecord(fileName) fileFuture = self.client.submit(fileObj.search_gene, query, maxResults) data, err = await self.client.gather(fileFuture) return data, err
[docs] @cached(ttl=None, cache=Cache.MEMORY, serializer=PickleSerializer(), namespace="binfile") async def binFileData(self, fileName, data, chr, start, end, bins, columns, metadata): """submit tasks to the dask client """ logging.debug("Handler: %s\t%s" %(fileName, "handleBinData")) fileObj = await self.getRecord(fileName) fileFuture = self.client.submit(fileObj.bin_rows, data, chr, start, end, columns=columns, metadata=metadata, bins=bins) data, err = await self.client.gather(fileFuture) return data, err