Source code for epivizfileserver.server.utils

from ..parser import BigBed, BigWig
import pandas
import ujson

[docs]def create_parser_object(format, source): """ Create appropriate File class based on file format Args: format : Type of file request : Other request parameters Returns: An instance of parser class """ req_manager = { "BigWig": BigWig, "bigwig": BigWig, "bigWig": BigWig, "bw": BigWig, "BigBed": BigBed, "bigbed": BigBed, "bigBed": BigBed, "bb": BigBed, "sam": SamFile, "bam": BamFile, "tbx": TbxFile, "tabix": TbxFile, "gtf": GtfFile, "gtfparsed": GtfParsedFile } return req_manager[format](source)
[docs]def format_result(input, params, offset=True): """ Fromat result to a epiviz compatible format Args: input : input dataframe params : request parameters offset: defaults to True Returns: formatted JSON response """ if len(input) > 0: input.start = input.start.astype("float") input.end = input.end.astype("float") globalStartIndex = None data = { "rows": { "globalStartIndex": globalStartIndex, "useOffset" : offset, "values": { "id": None, "chr": [], "strand": [], "metadata": {} } }, "values": { "globalStartIndex": globalStartIndex, "values": {} } } col_names = input.columns.values.tolist() row_names = ["chr", "start", "end", "strand", "id"] if len(input) > 0: globalStartIndex = input["start"].values.min() if offset: minStart = input["start"].iloc[0] minEnd = input["end"].iloc[0] input["start"] = input["start"].diff() input["end"] = input["end"].diff() input["start"].iloc[0] = minStart input["end"].iloc[0] = minEnd data = { "rows": { "globalStartIndex": globalStartIndex, "useOffset" : offset, "values": { "id": None, "chr": [], "strand": [], "metadata": {} } }, "values": { "globalStartIndex": globalStartIndex, "values": {} } } for col in col_names: if params.get("measurement") is not None and col in params.get("measurement"): data["values"]["values"][col] = input[col].values.tolist() elif col in row_names: data["rows"]["values"][col] = input[col].values.tolist() else: data["rows"]["values"]["metadata"][col] = input[col].values.tolist() else: data["rows"]["values"]["start"] = [] data["rows"]["values"]["end"] = [] if params.get("metadata") is not None: for met in params.get("metadata"): data["rows"]["values"]["metadata"][met] = [] else: for col in col_names: if params.get("measurement") is not None and col in params.get("measurement"): data["values"]["values"][col] = input[col].values.tolist() elif col in row_names: data["rows"]["values"][col] = input[col].values.tolist() else: data["rows"]["values"]["metadata"][col] = input[col].values.tolist() if params.get("measurement"): for col in params.get("measurement"): data["values"]["values"][col] = [] data["rows"]["values"]["id"] = None return data
[docs]def bin_rows(input, max_rows=2000): """ Helper function to bin rows to resolution Args: input: dataframe to bin max_rows: resolution to scale rows Returns: data frame with scaled rows """ input_length = len(input) if input_length < max_rows: return input step = max_rows col_names = input.columns.values.tolist() input["rowGroup"] = range(0, input_length) input["rowGroup"] = pandas.cut(input["rowGroup"], bins=max_rows) input_groups = input.groupby("rowGroup") agg_dict = {} for col in col_names: if col in ["chr", "probe", "gene", "region"]: agg_dict[col] = 'first' elif col in ["start", "id"]: agg_dict[col] = 'min' elif col == "end": agg_dict[col] = 'max' else: agg_dict[col] = 'mean' bin_input = input_groups.agg(agg_dict) return bin_input