Source code for epivizfileserver.parser.BamFile

import pysam
from .SamFile import SamFile
from .utils import toDataFrame

[docs]class BamFile(SamFile): """ Bam File Class to parse bam files Args: file (str): file location can be local (full path) or hosted publicly columns ([str]) : column names for various columns in file Attributes: file: a pysam file object fileSrc: location of the file cacheData: cache of accessed data in memory columns: column names to use """ def __init__(self, file, columns=None): self.file = pysam.AlignmentFile(file, "rb") self.fileSrc = file self.cacheData = {} self.columns = columns
[docs] def get_bin(self, x): if self.value_temp is not x.get_num_aligned() and self.value_temp is not None: self.result.append((self.chr_temp, self.start_temp, self.end_temp, self.value_temp)) self.value_temp = None if self.value_temp is None: self.chr_temp = x.reference_name self.start_temp = x.reference_pos self.value_temp = x.get_num_aligned() return (x.reference_name, x.reference_start, x.reference_end, x.query_alignment_sequence, x.query_sequence)
# given an array, turn it into a df
[docs] def to_DF(self, result): return toDataFrame(result, self.columns)
[docs] def to_msgpack(self, result): return toMsgpack(result)
[docs] def get_col_names(self, result): if self.columns is None: self.columns = ["chr", "start", "end", "number of sequence aligned"] return self.columns
[docs] def getRange(self, chr, start, end, bins=2000, zoomlvl=-1, metric="AVG", respType = "DataFrame"): """Get data for a given genomic location Args: chr (str): chromosome start (int): genomic start end (int): genomic end respType (str): result format type, default is "DataFrame Returns: result a DataFrame with matched regions from the input genomic location if respType is DataFrame else result is an array error if there was any error during the process """ try: iter = self.file.pileup(chr, start, end) self.result = [] # (result, _) = get_range_helper(self.to_DF, self.get_bin, self.get_col_names, chr, start, end, iter, self.columns, respType) result = [] chrTemp = startTemp = endTemp = valueTemp = None for x in iter: if valueTemp is None: chrTemp = x.reference_name startTemp = x.reference_pos valueTemp = x.get_num_aligned() elif valueTemp is not x.get_num_aligned(): result.append((chrTemp, startTemp, endTemp, valueTemp)) chrTemp = x.reference_name startTemp = x.reference_pos valueTemp = x.get_num_aligned() endTemp = x.reference_pos+1 columns = self.get_col_names(result[0]) if respType is "DataFrame": result = toDataFrame(result, self.columns) return result, None except ValueError as e: raise Exception("didn't find chromId with the given name")