Source code for sincei.VCRfinder

import sys
import time
from concurrent.futures import ThreadPoolExecutor

import numpy as np
import pandas as pd
import ruptures as rpt
from ruptures.exceptions import BadSegmentationParameters
from scipy import sparse
from tqdm import tqdm

import warnings

warnings.simplefilter(action="ignore", category=FutureWarning)


### Helper functions ###
[docs] def sparse_band_corr(X, k, chrom=None, verbose=True): """ Compute only the first k diagonals of the correlation matrix of X, stored in banded format. Works directly on sparse matrices. Parameters ---------- X : scipy.sparse matrix or np.ndarray Input data matrix of shape (n_samples, n_features). k : int Number of diagonals to compute (bandwidth). chrom : str or None, optional Chromosome name for progress bar description. verbose : bool, optional Whether to display progress bars. Returns ------- band_corr : np.ndarray, shape (2*k+1, n_features) Banded correlation matrix. """ n, p = X.shape band_corr = np.zeros((2 * k + 1, p)) # Main diagonal is always 1 band_corr[k, :] = 1.0 if sparse.issparse(X): # Work directly with sparse matrix (convert to CSC for efficient column access) X_csc = X.tocsc() if not sparse.isspmatrix_csc(X) else X # Compute column means and norms without densifying col_means = np.asarray(X_csc.mean(axis=0)).ravel() # Compute column norms: ||x - mean|| # = sqrt(sum(x^2) - 2*mean*sum(x) + n*mean^2) # = sqrt(sum(x^2) - n*mean^2) col_sq_sums = np.asarray(X_csc.multiply(X_csc).sum(axis=0)).ravel() col_norms = np.sqrt(col_sq_sums - n * col_means**2) col_norms[col_norms == 0] = np.inf # Compute correlations for each diagonal offset for d in tqdm(range(1, k + 1), desc=f"{chrom}: Computing banded covariance", disable=not verbose): n_pairs = p - d corr_vals = np.zeros(n_pairs) # Batch compute raw dot products: left_cols.T @ right_cols left_cols = X_csc[:, :n_pairs] right_cols = X_csc[:, d : d + n_pairs] # Compute element-wise products and sum per column pair # For sparse matrices, this is efficient with multiply + sum raw_dots = np.asarray(left_cols.multiply(right_cols).sum(axis=0)).ravel() # Centered dot products centered_dots = raw_dots - n * col_means[:n_pairs] * col_means[d : d + n_pairs] # Normalize to get correlations corr_vals = centered_dots / (col_norms[:n_pairs] * col_norms[d : d + n_pairs]) # Store in upper and lower bands (symmetric) band_corr[k - d, d:] = corr_vals band_corr[k + d, :n_pairs] = corr_vals else: # Dense path: use efficient einsum # Center and normalize columns Xc = X - X.mean(axis=0, keepdims=True) norms = np.linalg.norm(Xc, axis=0) norms[norms == 0] = np.inf Xn = Xc / norms # Off-diagonals for d in tqdm(range(1, k + 1), desc=f"{chrom}: Computing banded covariance", disable=not verbose): corr_vals = np.einsum( "ij,ij->j", Xn[:, : p - d], Xn[:, d:], optimize=True, ) band_corr[k - d, d:] = corr_vals band_corr[k + d, : p - d] = corr_vals return band_corr
[docs] def distance_kernel(sigma, truncate=4.0, radius=None): """ Create a square Gaussian distance kernel. Parameters ---------- sigma : float Standard deviation of the Gaussian. truncate : float, optional Truncate the kernel at this many standard deviations. Default is 4.0. radius : int, optional Radius of the kernel. If None, it is set to int(truncate * sigma). Returns ------- kernel : 2D numpy array The Gaussian distance kernel. """ if radius is None: radius = int(truncate * sigma) width = 1 + 2 * radius kernel = np.zeros((width, width)) for offset in range(1, width): kernel += offset * (np.eye(width, k=offset) + np.eye(width, k=-offset)) kernel += np.rot90(kernel) kernel /= 2 kernel = np.exp(-(kernel**2) / (2 * sigma**2)) # np.fill_diagonal(kernel, 0) # To not count self-correlation kernel /= kernel.sum() return kernel
[docs] def VCRfinder( adata, binsize, max_region, n_kernels=20, penalties=[1], region=None, verbose=False, n_threads=1, ): """ Detects variable chromatin regions (VCRs) from a anndata object containing genomic signal data in equally sized bins (see :ref:`scCountReads`) . First, a bin-to-bin correlation matrix is computed for each chromosome. Then, the correlation matrix is turned into a score map by convolving a number of square Gaussian kernels along its main diagonal. Each kernel has a sigma calculated using. Each kernel produces a 1-D score for each bin, which are stacked into a matrix where each row corresponds to a kernel scale and each column to a bin. Finally, the PELT change-point detection algorithm is applied to the score map to identify regions with distinct correlation patterns. This step depends on a penalty parameter that controls the number of detected regions. The function returns a pandas DataFrame containing the detected variable chromatin regions at each penalty. The DataFrame has columns: 'penalty', 'chrom', 'start', 'end'. Parameters ---------- adata : anndata.AnnData Input anndata object with binned chromatin data. `adata.var` must contain 'chrom', 'start', and 'end' columns. binsize : int Size of the bins in base pairs. max_region : int Size of the largest kernel in base pairs. n_kernels : int, optional Number of Gaussian kernels to use for convolution. Default is 20. penalties : list of float, optional List of penalty values for the change-point detection algorithm. Default is [1]. region : str, optional Genomic region to limit the analysis to (e.g., 'chr1:100000:200000'). Default is None. verbose : bool, optional Print progress messages and warnings. Default is False. n_threads : int, optional Number of threads to use for parallel processing, by default 1. Returns ------- output : pd.DataFrame Output DataFrame with detected variable chromatin regions at each penalty. """ adata.var[["start", "end"]] = adata.var[["start", "end"]].apply(pd.to_numeric) if region is not None: region = region.split(":") if len(region) == 3: chrom, start, end = region[0], int(region[1]), int(region[2]) elif len(region) == 2: chrom, start = region[0], int(region[1]) end = np.max(adata.var.loc[adata.var.chrom == chrom, "end"]) else: chrom = region[0] start = np.min(adata.var.loc[adata.var.chrom == chrom, "start"]) end = np.max(adata.var.loc[adata.var.chrom == chrom, "end"]) adata = adata[:, (adata.var["chrom"] == chrom) & (adata.var["start"] >= start) & (adata.var["end"] <= end)] chroms = adata.var["chrom"].unique() pen_bed_df = pd.DataFrame(columns=["chrom", "start", "end", "name", "score", "strand"]) for chrom in chroms: sys.stdout.write(f"Processing chromosome {chrom}...\n") adata_chrom = adata[:, adata.var["chrom"] == chrom] if adata_chrom.shape[1] == 1: sys.stderr.write(f"Skipping chromosome {chrom} with only one bin.\n") bkp_df = pd.DataFrame( { "chrom": [chrom] * len(penalties), "start": [int(adata_chrom.var["start"].values[0])] * len(penalties), "end": [int(adata_chrom.var["end"].values[0])] * len(penalties), "name": [f"pen-{pen}_brkpoint-1" for pen in penalties], "score": penalties, "strand": ["*"] * len(penalties), } ) pen_bed_df = pd.concat([pen_bed_df, bkp_df], ignore_index=True) continue start = adata_chrom.var["start"].min() # Sort the bins adata_chrom = adata_chrom[:, adata_chrom.var.sort_values(by=["start", "end"], axis=0).index] # rows where end - start != binsize mask = (adata_chrom.var["end"] - adata_chrom.var["start"]) != binsize # if the last row binsize is >= 1/2 of binsize, update its 'end' value last_idx = adata_chrom.var.index[-1] if mask.loc[last_idx]: if adata_chrom.var.loc[last_idx, "end"] - adata_chrom.var.loc[last_idx, "start"] >= (binsize / 2): adata_chrom.var.loc[last_idx, "end"] = adata_chrom.var.loc[last_idx, "start"] + binsize else: # eject last row sys.stderr.write(f"Feature {last_idx} removed due to difference in binsize\n") adata_chrom = adata_chrom[:, : adata_chrom.var.index[-2]] assert all( (adata_chrom.var["end"] - adata_chrom.var["start"] == binsize) | (adata_chrom.var["end"] - adata_chrom.var["start"] == (binsize - 1)) ), f"Variable bin sizes detected in chromosome {chrom}" reg_to_consider = min(max_region, max(int(adata_chrom.shape[1] * binsize / n_kernels), binsize)) # Calculate sigmas k_factor = (reg_to_consider / binsize) ** (1 / n_kernels) sigmas = 0.25 * k_factor ** np.arange(1, n_kernels + 1) # Calculate the number of diagonals needed (capped at number of bins) k = min(round(4.0 * np.max(sigmas)), adata_chrom.shape[1]) # Get bin-bin correlations ctime = time.time() corr = sparse_band_corr(adata_chrom.X, k=k, chrom=chrom, verbose=verbose) p = adata_chrom.shape[1] # number of bins ctime = time.time() - ctime if verbose: sys.stdout.write( f"Chromosome {chrom}: Banded correlation with {k} diagonals calculated in {ctime:.2f} seconds\n" ) scores = np.zeros((p, len(sigmas))) def _score_sigma(args): i, sigma = args radius = min(k, int(4.0 * sigma)) kernel = distance_kernel(sigma=sigma, radius=radius) k_width = kernel.shape[0] k_radius = k_width // 2 score_row = np.zeros(p) # Decompose 2D kernel convolution into 1D convolutions per diagonal # score[pos] = sum over (di, dj) of kernel[di+r, dj+r] * corr[pos+di, pos+dj] # Group by d = dj - di (the diagonal offset in corr) for d in range(-min(k_radius, k), min(k_radius, k) + 1): # Extract diagonal d from kernel: elements where col - row = d kernel_diag = np.diag(kernel, k=d) # shape: (k_width - |d|,) # Get correlation values for diagonal offset d from banded storage # band_corr[k - d, :] contains corr[i, i+d] for d >= 0 # band_corr[k - d, :] contains corr[i-d, i] for d < 0 corr_diag = corr[k - d, :] # shape: (p,) # Convolve: this computes sum of kernel_diag[j] * corr_diag[pos + j - offset] # We need to account for the offset in the kernel diagonal conv = np.convolve(corr_diag, kernel_diag[::-1], mode="same") score_row += conv return i, score_row if n_threads > 1: with ThreadPoolExecutor(max_workers=n_threads) as executor: for i, score_row in tqdm( executor.map(_score_sigma, list(enumerate(sigmas))), total=len(sigmas), desc=f"{chrom}: Calculating score matrix", disable=not verbose, ): scores[:, i] = score_row else: for i, sigma in tqdm( list(enumerate(sigmas)), desc=f"{chrom}: Calculating score matrix", disable=not verbose ): _, score_row = _score_sigma((i, sigma)) scores[:, i] = score_row # Use Pelt with L2 cost - O(n) memory # scores has shape (p, n_kernels) - each position is a feature vector algo = rpt.KernelCPD(kernel="linear", min_size=1, jump=1).fit(scores) def _predict_penalty(pen): """Predict breakpoints for a single penalty value.""" try: bkps = algo.predict(pen=pen) except BadSegmentationParameters: sys.stderr.write(f" - No breakpoints detected for penalty {pen} in chrom {chrom}.\n") return None sys.stdout.write(f"Detected {len(bkps)} VCRs in chromosome {chrom} with penalty {pen}\n") prevs = [0, *bkps[:-1]] bkp_df = pd.DataFrame( { "chrom": chrom, "start": [int(start + prev * 2000) for prev in prevs], "end": [int(start + bkp * 2000) for bkp in bkps], "name": [f"{chrom}_VCR_{bkp}_pen{pen}" for bkp in bkps], "score": pen, "strand": "*", } ) return bkp_df if n_threads > 1: with ThreadPoolExecutor(max_workers=n_threads) as executor: for bkp_df in tqdm( executor.map(_predict_penalty, penalties), total=len(penalties), desc=f"{chrom}: Change-point detection", disable=not verbose, ): if bkp_df is not None: pen_bed_df = pd.concat([pen_bed_df, bkp_df], ignore_index=True) else: for pen in tqdm(penalties, desc=f"{chrom}: Change-point detection", disable=not verbose): bkp_df = _predict_penalty(pen) if bkp_df is not None: pen_bed_df = pd.concat([pen_bed_df, bkp_df], ignore_index=True) sys.stdout.write("\n") return pen_bed_df