Source code for epivizfileserver.measurements.measurementManager

from aiocache import cached, SimpleMemoryCache
from aiocache.serializers import JsonSerializer
import pandas as pd
from .measurementClass import DbMeasurement, FileMeasurement, ComputedMeasurement
from ..trackhub import TrackHub
from ..parser import GtfParsedFile, TbxFile, BigBed
import ujson
import requests
import pandas as pd

from sanic.log import logger as logging

[docs]class EMDMeasurementMap(object): """ Manage mapping between measuremnts in EFS and metadata service """ def __init__(self, url, fileHandler): self.emd_endpoint = url self.handler = fileHandler # collection records from emd self.collections = dict() # map { emd id => efs measurement id } self.measurement_map = dict()
[docs] def init(self): logging.debug("Initializing from emd at {}".format(self.emd_endpoint)) self.init_collections() records = self.init_measurements() logging.debug("Done initializing from emd") return records
[docs] def init_collections(self): req_url = self.emd_endpoint + "/collections/" logging.debug("Initializing collections from emd") r = requests.get(req_url) if r.status_code != 200: raise Exception("Error initializing collections from emd {}: {}".format(req_url, r.text)) collection_records = r.json() for rec in collection_records: # map database id to efs id self.collections[rec['id']] = rec['collection_id'] logging.debug("Done initializing collections from emd")
[docs] def process_emd_record(self, rec): # this is not elegant but... the epiviz-md api returns an 'id' which is the # database id, we want the id of the record to be the 'measurement_id' as returned # by the epiviz-md api endpoint, so let's do that bit of surgery # we keep a map between ids here self.measurement_map[rec['id']] = rec['measurement_id'] rec['id'] = rec['measurement_id'] del rec['measurement_id'] collection_id = rec['collection_id'] del rec['collection_id'] collection_name = self.collections[collection_id] current_annotation = rec['annotation'] if current_annotation is None: current_annotation = { "collection": collection_name } else: current_annotation['collection'] = collection_name rec['annotation'] = current_annotation
[docs] def init_measurements(self): req_url = self.emd_endpoint + "/ms/" logging.debug("Initializing measurements from emd") r = requests.get(req_url) if r.status_code != 200: raise Exception("Error initializing measurements from emd {}: {}".format(req_url, r.text)) records = r.json() for rec in records: self.process_emd_record(rec) logging.debug("Done initializing measurements") return records
[docs] def sync(self, current_ms): logging.debug("Syncing with emd at {}".format(self.emd_endpoint)) # this will remove deleted collections from # the collection id map new_collections = self.sync_collections() new_records_from_collections = self.add_new_collections(new_collections) # this will remove measurements in current_ms # no longer in the emd db new_measurements = self.sync_measurements(current_ms) new_records = self.add_new_measurements(new_measurements) logging.debug("Done syncing with emd") return new_records_from_collections + new_records
[docs] def sync_collections(self): req_url = self.emd_endpoint + "/collections/ids" logging.debug("Syncing collections from emd") r = requests.get(req_url) if r.status_code != 200: raise Exception("Error getting collection ids to sync from emd {}: {}".format(req_url, r.text)) emd_ids = r.json() new_ids = list(set(emd_ids) - set(self.collections.values())) del_ids = [ k for k, v in self.collections.items() if v not in emd_ids ] for id in del_ids: del self.collections[id] return new_ids
[docs] def add_new_collections(self, new_collection_ids): logging.debug("Adding new collections from emd") all_records = [] for collection_id in new_collection_ids: req_url = self.emd_endpoint + "/collections/" + collection_id r = requests.get(req_url) if r.status_code != 200: raise Exception("Error getting collection with id {} from {}: {}".format(collection_id, req_url, r.text)) rec = r.json() # map emd db id to efs id self.collections[rec['id']] = rec['collection_id'] logging.debug("Added new collection {} from emd".format(rec['collection_id'])) logging.debug("Adding measurements from collection {} from emd".format(rec['collection_id'])) req_url = self.emd_endpoint + "/collections/" + collection_id + "/ms" r = requests.get(req_url) if r.status_code != 200: raise Exception("Error getting records for collection with id {} from {}: {}".format(collection_id, req_url, r.text)) records = r.json() for rec in records: self.process_emd_record(rec) logging.debug("Done adding measurements from new collection") all_records.extend(records) logging.debug("Done adding new collections from emd") return all_records
[docs] def sync_measurements(self, current_ms): req_url = self.emd_endpoint + "/ms/ids" logging.debug("Syncing measurements from emd") r = requests.get(req_url) if r.status_code != 200: raise Exception("Error getting ms ids to sync from emd {}: {}".format(req_url, r.text)) ms_ids = r.json() new_ids = list(set(ms_ids) - set(self.measurement_map.values())) del_ids = [ k for k, v in self.measurement_map.items() if v not in ms_ids] for id in del_ids: ms_id = self.measurement_map[id] del current_ms[ms_id] if id in self.measurement_map: del self.measurement_map[id] else: logging.debug("Tried to del ms map {}: not found".format(id)) return new_ids
[docs] def add_new_measurements(self, new_ms_ids): logging.debug("Adding new ms from emd") all_records = [] for ms_id in new_ms_ids: req_url = self.emd_endpoint + "/ms/" + ms_id r = requests.get(req_url) if r.status_code != 200: raise Exception("Error getting ms with id {} from {}: {}".format(ms_id, req_url, r.text)) rec = r.json() self.process_emd_record(rec) all_records.append(rec) logging.debug("Done adding new ms from emd") return all_records
[docs]class MeasurementSet(object): def __init__(self): self.measurements = {}
[docs] def append(self, ms): self.measurements[ms.mid] = ms
def __delitem__(self, key): if key in self.measurements: del self.measurements[key] else: logging.debug("Tried to del ms {}: not found".format(key))
[docs] def get(self, key): return self.measurements[key] if key in self.measurements else None
[docs] def get_measurements(self): return self.measurements.values()
[docs] def get_mids(self): return self.measurements.keys()
[docs]class MeasurementManager(object): """ Measurement manager class Attributes: measurements: list of all measurements managed by the system """ def __init__(self): # self.measurements = pd.DataFrame() self.genomes = {} self.measurements = MeasurementSet() self.emd_endpoint = None self.emd_map = None self.tiledb = [] self.stats = { "getRows": {}, "getValues": {}, "search": {} }
[docs] def import_dbm(self, dbConn): """Import measurements from a database.The database needs to have a `measurements_index` table with information of files imported into the database. Args: dbConn: a database connection """ query = "select * from measurements_index" with dbConn.cursor() as cursor: cursor.execute(query) result = cursor.fetchall() for rec in result: isGene = False if "genes" in rec["location"]: isGene = True annotation = None if rec["annotation"] is not None: annotation = ujson.loads(rec["annotation"]) tempDbM = DbMeasurement("db", rec["column_name"], rec["measurement_name"], rec["location"], rec["location"], dbConn=dbConn, annotation=annotation, metadata=ujson.loads(rec["metadata"]), isGenes=isGene ) self.measurements.append(tempDbM)
[docs] def import_files(self, fileSource, fileHandler=None, genome=None): """Import measurements from a file. Args: fileSource: location of the configuration file to load fileHandler: an optional filehandler to use """ with open(fileSource, 'r') as f: json_string = f.read() records = ujson.loads(json_string) self.import_records(records, fileHandler=fileHandler, genome=genome)
[docs] def import_records(self, records, fileHandler=None, genome=None): """Import measurements from a list of records (usually from a decoded json string) Args: fileSource: location of the configuration json file to load fileHandler: an optional filehandler to use """ measurements = [] num_records = len(records) for i, rec in enumerate(records): format_args = { "i": i, "num_records": num_records, "datatype": rec['datatype'], "file_type": rec['file_type'] } logging.debug("Importing record {i}/{num_records} with datatype {datatype} and file type {file_type}".format(**format_args)) isGene = False if "annotation" in rec["datatype"]: isGene = True if rec.get("genome") is None and genome is None: raise Exception("all files must be annotated with its genome build") tgenome = rec.get("genome") if tgenome is None: tgenome = genome if rec.get("file_type") == "tiledb": # its expression dataset samples = pd.read_csv(rec.get("url") + "/cols", sep="\t", index_col=0) sample_names = samples.index.values rows = pd.read_csv(rec.get("url") + "/rows", sep="\t", index_col=False, nrows=10) metadata = rows.columns.values metadata = [ m for m in metadata if m not in ['chr', 'start', 'end'] ] for samp, (index, row) in zip(sample_names, samples.iterrows()): anno = row.to_dict() anno["_filetype"] = rec.get("file_type") for key in rec.get("annotation").keys(): anno[key] = rec.get("annotation").get(key) tempFileM = FileMeasurement(rec.get("file_type"), samp, samp + "_" + rec.get("name"), rec.get("url"), genome=tgenome, annotation=anno, metadata=metadata, minValue=0, maxValue=20, isGenes=isGene, fileHandler=fileHandler ) measurements.append(tempFileM) self.measurements.append(tempFileM) elif rec.get("file_type").lower() in ["gwas", "bigbed"]: anno = rec.get("annotation") if anno is None: anno = {} bw = BigBed(rec.get("url")) metadata = bw.get_autosql() if metadata and len(metadata) > 3: metadata = metadata[3:] else: metadata = [] anno["_filetype"] = rec.get("file_type") tempFileM = FileMeasurement(rec.get("file_type"), rec.get("id"), rec.get("name"), rec.get("url"), genome=tgenome, annotation=anno, metadata=metadata, minValue=0, maxValue=5, isGenes=isGene, fileHandler=fileHandler ) measurements.append(tempFileM) self.measurements.append(tempFileM) else: anno = rec.get("annotation") if anno is None: anno = {} anno["_filetype"] = rec.get("file_type") tempFileM = FileMeasurement(rec.get("file_type"), rec.get("id"), rec.get("name"), rec.get("url"), genome=tgenome, annotation=anno, metadata=rec.get("metadata"), minValue=0, maxValue=5, isGenes=isGene, fileHandler=fileHandler ) measurements.append(tempFileM) self.measurements.append(tempFileM) return(measurements)
[docs] def import_ahub(self, ahub, handler=None): """Import measurements from annotationHub objects. Args: ahub: list of file records from annotationHub handler: an optional filehandler to use """ measurements = [] for i, row in ahub.iterrows(): if "EpigenomeRoadMapPreparer" in row["preparerclass"]: tempFile = FileMeasurement(row["source_type"], row["ah_id"], row["title"], row["sourceurl"]) self.measurements.append(tempFile) measurements.append(tempFile) return measurements
[docs] def get_from_emd(self, url=None): """Make a GET request to a metadata api Args: url: the url of the epiviz-md api. If none the url on self.emd_endpoint is used if available (None) """ if url is None: url = self.emd_endpoint if url is None: raise Exception("Error reading measurements from emd endpoint: missing url") req_url = url + "/collections/" r = requests.get(req_url) if r.status_code != 200: raise Exception("Error getting collections from emd {}".format(req_url)) collection_records = r.json() collections = {} for rec in collection_records: collections[rec['id']] = rec['collection_id'] req_url = url + "/ms/" r = requests.get(req_url) if r.status_code != 200: raise Exception("Error importing measurements from collection {} with url {}: {}".format(collection_record['collection_id'], req_url, r.text)) records = r.json() # this is not elegant but... the epiviz-md api returns an 'id' which is the # database id, we want the id of the record to be the 'measurement_id' as returned # by the epiviz-md api endpoint, so let's do that bit of surgery for rec in records: rec['id'] = rec['measurement_id'] del rec['measurement_id'] collection_id = rec['collection_id'] del rec['collection_id'] collection_name = collections[collection_id] current_annotation = rec['annotation'] if current_annotation is None: current_annotation = { "collection": collection_name } else: current_annotation['collection'] = collection_name rec['annotation'] = current_annotation return records
[docs] def use_emd(self, url, fileHandler=None): """Delegate all getMeasurement calls to an epiviz-md metdata service api Args: url: the url of the epiviz-md api fileHandler: an optional filehandler to use """ logging.debug("Will be using emd api at {}".format(url)) self.emd_map = EMDMeasurementMap(url, fileHandler) records = self.emd_map.init() self.import_records(records, fileHandler = fileHandler)
[docs] def import_emd(self, url, fileHandler=None, listen=True): """Import measurements from an epiviz-md metadata service api. Args: url: the url of the epiviz-md api handler: an optional filehandler to use listen: activate 'updateCollections' endpoint to add measurements from the service upon request """ if listen: self.emd_endpoint = url records = self.get_from_emd(url) self.import_records(records, fileHandler=fileHandler)
[docs] def add_computed_measurement(self, mtype, mid, name, measurements, computeFunc, genome=None, annotation=None, metadata=None, computeAxis=1): """Add a Computed Measurement Args: mtype: measurement type, defaults to 'computed' mid: measurement id name: name for this measurement measurements: list of measurement to use computeFunc: `NumPy` function to apply Returns: a `ComputedMeasurement` object """ tempComputeM = ComputedMeasurement(mtype, mid, name, measurements=measurements, computeFunc=computeFunc, genome=genome, annotation=annotation, metadata=metadata, computeAxis=computeAxis) self.measurements.append(tempComputeM) return tempComputeM
[docs] def add_genome(self, genome, url="http://obj.umiacs.umd.edu/genomes/", type=None, fileHandler=None): """Add a genome to the list of measurements. The genome has to be tabix indexed for the file server to make remote queries. Our tabix indexed files are available at https://obj.umiacs.umd.edu/genomes/index.html Args: genome: for example : hg19 if type = "tabix" or full location of gtf file if type = "gtf" genome_id: required if type = "gtf" url: url to the genome file """ isGene = True tempGenomeM = None if type == "tabix": gurl = url + genome + "/" + genome + ".txt.gz" tempGenomeM = FileMeasurement("tabix", genome, genome, gurl, genome, annotation={"group": "genome"}, metadata=["geneid", "exons_start", "exons_end", "gene"], minValue=0, maxValue=5, isGenes=isGene, fileHandler=fileHandler, columns=["chr", "start", "end", "width", "strand", "geneid", "exon_starts", "exon_ends", "gene"] ) # self.genomes.append(tempGenomeM) # gtf_file = TbxFile(gurl) # self.genomes[genome] = gtf_file self.measurements.append(tempGenomeM) elif type == "efs-tsv": gurl = url tempGenomeM = FileMeasurement("gtfparsed", genome, genome, gurl, genome=genome, annotation={"group": "genome"}, metadata=["geneid", "exons_start", "exons_end", "gene"], minValue=0, maxValue=5, isGenes=isGene, fileHandler=fileHandler, columns=["chr", "start", "end", "width", "strand", "geneid", "exon_starts", "exon_ends", "gene"] ) gtf_file = GtfParsedFile(gurl) self.genomes[genome] = gtf_file self.measurements.append(tempGenomeM) elif type == "gtf": gurl = url tempGenomeM = FileMeasurement("gtf", genome, genome, gurl, genome=genome, annotation={"group": "genome"}, metadata=["geneid", "exons_start", "exons_end", "gene"], minValue=0, maxValue=5, isGenes=isGene, fileHandler=fileHandler, columns=["chr", "start", "end", "width", "strand", "geneid", "exon_starts", "exon_ends", "gene"] ) gtf_file = GtfFile(gurl) self.genomes[genome] = gtf_file self.measurements.append(tempGenomeM) return(tempGenomeM)
[docs] def get_measurements(self): """Get all available measurements """ if self.emd_map is not None: # this will remove measurements in self.measureemnts # that are not in the emd dbs any more logging.debug("Getting mesurements. Cur ms {}".format(list(self.measurements.get_mids()))) new_records = self.emd_map.sync(self.measurements) self.import_records(new_records, fileHandler = self.emd_map.handler) return self.measurements.get_measurements()
[docs] def get_measurement(self, ms_id): """Get a specific measurement """ return self.measurements.get(ms_id)
[docs] def get_genomes(self): """Get all available genomes """ return self.genomes
[docs] def import_trackhub(self, hub, handler=None): """Import measurements from annotationHub objects. Args: ahub: list of file records from annotationHub handler: an optional filehandler to use """ measurements = [] trackhub = TrackHub(hub) if handler is not None: for m in trackhub.measurments: # TODO: this looks wrong m.fileHandler = fileHandler measurements.append(m) self.measurements.append(measurements) return measurements