Source code for sincei.Utilities

from itertools import compress
from deeptools.utilities import getTLen
import numpy as np
import sys


[docs]def checkBAMtag(bam, name, tag): bctag = [read.has_tag(tag) for read in bam.head(1000)] if not any(bctag): sys.stderr.write("WARNING: Input file {} seems to lack the tag {}. Output might be empty. \n".format(name, tag)) return None
[docs]def checkMotifs(read, chrom, genome, readMotif, refMotif): """ Check whether a given motif is present in the read and the corresponding reference genome. For example, in MNAse (scChIC-seq) data, we expect the reads to have an 'A' at the 5'-end, while the genome has a 'TA' over hang (where the 'A' aligns with 'A' in the forward read), like this below. Forwards aligned read: read has 'A', upstream has T R1 ........A-------> ----------TA------------\ Ref (+) Rev aligned read: read has 'T', downstream has A <-------T....... R1 --------TA------------\ Ref (+) This function can look for any arbitrary motif in read and corresponding genome, but in the same orientation as described above. :return: bool >>> import pysam >>> import os >>> from scDeepTools.scReadCounter import CountReadsPerBin as cr >>> root = os.path.dirname(os.path.abspath(__file__)) + "/test/test_data/" >>> bam = pysam.AlignmentFile("{}/test_TA_filtering.bam".format(root)) >>> iter = bam.fetch() >>> read = next(iter) >>> cr.checkMotifs(read, 'A', 'TA') # for valid scChIC read True >>> read = next(iter) >>> cr.checkMotifs(read, 'A', 'TA') # for invalid scChIC read False """ # get read and ref motif pos read_motif = read.get_forward_sequence()[0 : len(readMotif)] ref_motifLen = len(refMotif) - 1 if read.is_reverse: # for reverse reads ref motif begins at read-end and ends downstream endpos = read.reference_end + ref_motifLen if endpos > genome.chroms()[chrom]: endpos = read.reference_end # fail without error ref_motif = genome.sequence(chrom, read.reference_end - 1, endpos) else: # for forward reads ref motif begins upstream and ends at read-start startpos = read.reference_start - ref_motifLen if startpos < 0: startpos = read.reference_start # fail without error ref_motif = genome.sequence(chrom, startpos, read.reference_start + 1) if read_motif == readMotif and ref_motif == refMotif: return True else: return False
[docs]def checkGCcontent(read, lowFilter, highFilter, returnGC=False): r"""Checks if the GC content of the read is within the given range Parameters ---------- read : pysam.AlignedSegment A pysam AlignedSegment object lowFilter : float Minimum GC content highFilter : float Maximum GC content returnGC : bool If true, return the GC content of the read Returns ------- bool True if the GC content of the read is within the given range Examples -------- >>> test = Tester() >>> read = test.bamFile1.fetch().next() >>> checkGCcontent(read, 0.3, 0.7) True """ seq = read.get_forward_sequence() total_bases = len(seq) gc_bases = len([x for x in seq if x == "C" or x == "G"]) gc_frac = float(gc_bases) / total_bases if returnGC: return gc_frac else: if gc_frac >= lowFilter and gc_frac <= highFilter: return True else: return False
[docs]def checkAlignedFraction(read, lowFilter): """ Check whether the fraction of read length that aligns to the reference is higher than the given threshold. Aligned fraction includes the max allowed mismatches tolerated by the aligner, and excludes InDels and Clippings. Return: Bool """ cig = read.cigartuples tot = read.infer_read_length() matchPos = [i[0] == 0 for i in cig] matchSum = sum([i[1] for i in list(compress(cig, matchPos))]) if matchSum / tot >= lowFilter: return True else: return False
[docs]def colorPicker(name): r""" This function returns a list of colors for plotting. Parameters ---------- name : str The name of the color palette to use. Returns ------- list A list of colors. Examples -------- >>> colorPicker('twentyfive') ['#e41a1c', '#377eb8', '#4daf4a', '#984ea3', '#ff7f00', '#ffff33', '#a65628', '#f781bf', '#999999'] >>> colorPicker('colorblind') ['#0072B2', '#009E73', '#D55E00', '#CC79A7', '#F0E442', '#56B4E9', '#E69F00', '#000000'] """ colors = { "twentyfive": [ "#dd4e34", "#78d545", "#b047dc", "#d6d94d", "#5d48c6", "#74d88b", "#c944af", "#8a9739", "#542c75", "#d3953c", "#607dc7", "#487f46", "#d04774", "#7bd2c8", "#6b2737", "#cfcb9a", "#332a42", "#d7928a", "#343d25", "#cc8ad3", "#7b6a43", "#b5bad9", "#99472b", "#4e8290", "#936987", ], "colorblind": [ "#ffaec0", "#cd7600", "#893558", "#195f37", "#da71f9", "#a2d391", "#881e9b", "#b9d05f", "#524d7e", "#f2bf4b", "#01d6c2", "#f54040", "#0097fb", "#756400", "#4b44aa", "#f0bd79", "#a1008b", "#6d4d02", "#ff9afc", "#01df8f", "#e2b8ed", "#6e9d00", "#f4177b", "#01b65f", "#9b2532", ], } return colors[name]
[docs]def getDupFilterTuple(read, bc, filterArg): r""" Returns a tuple with the information needed to filter duplicates, based on read and filter type. The tuple is composed of the barcode, the umi, the start and end positions and the chromosome name. Parameters ---------- read : pysam.AlignedSegment A pysam.AlignedSegment object bc : str The barcode filter : str A string with the type of filter to use. Returns ------- tuple A tuple with the information needed to filter duplicates. The tuple is composed of the barcode, the umi, the start and end positions and the chromosome name. Examples -------- >>> test = Tester() >>> read = test.bamFile1.fetch().next() >>> getDupFilterTuple(read, 'ATCG', 'end_umi') ('ATCG', 'ATCG', None, None, 0, False) """ tLenDup = getTLen(read, notAbs=True) filt = filterArg.split("_") # get fragment start and end for that read if tLenDup >= 0: s = read.pos e = s + tLenDup else: s = read.pnext e = s - tLenDup if "end" not in filt: # ignore read/fragment end and mate information mate_refid = read.reference_id if read.is_reverse: s = None else: e = None else: # use mate info, reset fragment end to mate pos if read is chimeric if read.reference_id != read.next_reference_id: e = read.pnext mate_refid = read.next_reference_id # get UMI if asked if "umi" in filt: try: umi = read.get_tag("RX") except KeyError: sys.stderr.write("UMI tag (RX) absent, skipping UMI check") umi = None else: umi = None tup = (bc, umi, s, e, mate_refid, read.is_reverse) return tup
[docs]def gini(i, X): r"""Computes the Gini coefficient for each row of a sparse matrix (Obs*Var). Parameters ---------- i : int row index X : numpy array matrix Returns ------- float Gini coefficient for the given row Examples -------- >>> X = np.matrix([[1,2,3,4],[5,6,7,8],[9,10,11,12]]) >>> gini(0, X) 0.0 >>> gini(1, X) 0.0 >>> gini(2, X) 0.0 """ # based on bottom eq: # http://www.statsdirect.com/help/generatedimages/equations/equation154.svg # from: # http://www.statsdirect.com/help/default.htm#nonparametric_methods/gini.htm # All values are treated equally, arrays must be 1d: array = X[i, :].A.flatten() # get all bins from i'th cell array = array[array.nonzero()] if array.shape[0] <= 1: return np.nan else: array = np.sort(array) # Index per array element: index = np.arange(1, array.shape[0] + 1) # Number of array elements: n = array.shape[0] # Gini coefficient: return (np.sum((2 * index - n - 1) * array)) / (n * np.sum(array))