Source code for epivizfileserver.parser.GtfFile

import pysam
from .utils import toDataFrame
from .Helper import get_range_helper
import pandas as pd
from aiocache import cached, Cache
from aiocache.serializers import JsonSerializer, PickleSerializer

[docs]class GtfFile(object): """ GTF File Class to parse gtf/gff files Args: file (str): file location can be local (full path) or hosted publicly columns ([str]) : column names for various columns in file Attributes: file: a pysam file object fileSrc: location of the file cacheData: cache of accessed data in memory columns: column names to use """ def __init__(self, file, columns=["chr", "source", "feature", "start", "end", "score", "strand", "frame", "group"]): self.fileSrc = file self.columns = columns print("Loading annotations", file) gtf = pd.read_csv(file, sep="\t", names = columns) print("Parsing gene names") gtf["gene_id"] = gtf["group"].apply(self.parse_attribute, key="gene_id").replace('"', "") # self.file["gene_idx"] = self.file["gene_id"].replace('"', "") print("Parsing transcript ids") gtf["transcript_id"] = gtf["group"].apply(self.parse_attribute, key="transcript_id") # self.file = self.file.set_index("gene_idx") # print("Groupby genes and collapse exon positions") # genes = gtf.groupby("gene_id") self.file = gtf # self.file = pd.DataFrame(columns=["chr", "start", "end", "width", "strand", "geneid", "exon_starts", "exon_ends", "gene"]) # for name, gdf in genes: # gdf_exons = gdf[(gdf["feature"].str.contains("exon", case=False, regex=True))] # if len(gdf_exons) == 0: # gdf_exons = gdf # rec = { # "chr": gdf["chr"].unique()[0], # "start": gdf["start"].values.min(), # "end": gdf["end"].values.max(), # "width": gdf["end"].values.max() - gdf["start"].values.min(), # "strand": gdf["strand"].unique()[0], # "geneid": name.replace('"', ""), # "exon_starts": ",".join(str(n) for n in gdf_exons["start"].values), # "exon_ends": ",".join(str(n) for n in gdf_exons["end"].values), # "gene": name.replace('"', "") # } # self.file = self.file.append(rec, ignore_index=True) print("Indexing by intervals") self.file["start_idx"] = self.file["start"] self.file["end_idx"] = self.file["end"] self.file = self.file.set_index(['start_idx', 'end_idx']) self.file.index = pd.IntervalIndex.from_tuples(self.file.index) print("Parsing chromsomes and their lengths") chromosomes = [] groupByChr = self.file.groupby("chr") for name, gdf in groupByChr: chromosomes.append([name, 1, int(gdf["end"].values.max())]) self.chromosomes = chromosomes
[docs] def parse_attribute(self, item, key): if key in item: tstr = item.split(key, 1) tstrval = tstr[1].split(";", 1) return tstrval[0][1:] else: return None
[docs] def search_gene(self, query, maxResults = 5): result = [] err = None try: if len(query) > 1: genome = self.file[self.file["gene_id"].str.contains(query, na=False, case=False)] genes = genome.groupby("gene_id") for name, gdf in genes: rec = { "chr": gdf["chr"].unique()[0], "start": int(gdf["start"].values.min()), "end": int(gdf["end"].values.max()), "gene": name.replace('"', "") } result.append(rec) # for row, index in genes.head(maxResults): # rec = { # "chr": row["chr"], # "start": row["start"], # "end": row["end"], # "gene": row["gene_id"] # } # result.append(rec) return result, err except Exception as e: return {}, str(e)
[docs] def get_col_names(self): return self.columns
[docs] def getRange(self, chr, start, end, bins=2000, zoomlvl=-1, metric="AVG", respType = "DataFrame"): """Get data for a given genomic location Args: chr (str): chromosome start (int): genomic start end (int): genomic end respType (str): result format type, default is "DataFrame Returns: result a DataFrame with matched regions from the input genomic location if respType is DataFrame else result is an array error if there was any error during the process """ result = pd.DataFrame(columns=["chr", "start", "end", "width", "strand", "geneid", "exon_starts", "exon_ends", "gene"]) try: grange = self.file[(self.file.index.left <= end) & (self.file.index.right >= start) & (self.file["chr"] == chr)] # grange = grange.sort_values(by=["start", "end"]) if len(grange) > 0: genes = grange.groupby("gene_id") for name, gdf in genes: gdf_exons = gdf[(gdf["feature"].str.contains("exon", case=False, regex=True))] if len(gdf_exons) == 0: gdf_exons = gdf rec = { "chr": gdf["chr"].unique()[0], "start": int(gdf["start"].values.min()), "end": int(gdf["end"].values.max()), "width": int(gdf["end"].values.max()) - int(gdf["start"].values.min()), "strand": gdf["strand"].unique()[0], "geneid": name.replace('"', ""), "exon_starts": ",".join(str(int(n)) for n in gdf_exons["start"].values), "exon_ends": ",".join(str(int(n)) for n in gdf_exons["end"].values), "gene": name.replace('"', "") } result = result.append(rec, ignore_index=True) return result, None else: return result, "no genes in the current region" except Exception as e: return result, str(e)
[docs] @cached(ttl=None, cache=Cache.MEMORY, serializer=PickleSerializer(), namespace="gtfsearchgene") async def searchGene(self, query, maxResults = 5): return self.search_gene(query, maxResults)
[docs] @cached(ttl=None, cache=Cache.MEMORY, serializer=PickleSerializer(), namespace="gtfgetdata") async def get_data(self, chr, start, end, bins=2000, zoomlvl=-1, metric="AVG", respType = "DataFrame"): return self.getRange(chr, start, end, bins=bins, zoomlvl=zoomlvl, metric=metric, respType=respType)