Source code for pylimma.splicing

# SPDX-License-Identifier: GPL-3.0-or-later
#
# This module is a Python port of code from R limma. Original R copyrights:
#   diffSplice.R                          Copyright (C) 2013-2025 Gordon Smyth,
#                                                                 Charity Law
#   topSplice.R                           Copyright (C) 2013-2025 Gordon Smyth,
#                                                                 Yunshun Chen
#   plotSplice.R                          Copyright (C) 2014-2025 Gordon Smyth,
#                                                                 Yifang Hu,
#                                                                 Yunshun Chen
# Python port: Copyright (C) 2026 John Mulvey
"""
Differential exon-usage testing for pylimma.

Faithful port of R limma's differential-splicing trio:

- ``diff_splice``: per-exon and per-gene tests from an exon-level fit.
- ``top_splice``: collate ``diff_splice`` output into a ranked DataFrame.
- ``plot_splice``: plot exons of one gene from ``diff_splice`` output.
"""

from __future__ import annotations

import warnings

import numpy as np
import pandas as pd
from scipy.stats import f as f_dist
from scipy.stats import t as t_dist

from .classes import MArrayLM, _resolve_fit_input
from .squeeze_var import squeeze_var
from .utils import p_adjust


def _rowsum_sorted(values: np.ndarray, group_codes: np.ndarray, n_groups: int) -> np.ndarray:
    """Port of R's ``rowsum(values, group, reorder=FALSE)`` for the case
    where group codes are already in order-of-first-appearance.

    ``values`` has shape (nrows,) or (nrows, ncols). ``group_codes`` is
    an integer array mapping each row to a group code in [0, n_groups).
    Returns an array of shape (n_groups,) or (n_groups, ncols).
    """
    values = np.asarray(values)
    if values.ndim == 1:
        out = np.zeros(n_groups, dtype=np.float64)
        np.add.at(out, group_codes, values)
        return out
    out = np.zeros((n_groups, values.shape[1]), dtype=np.float64)
    np.add.at(out, group_codes, values)
    return out


[docs] def diff_splice( fit, geneid, exonid=None, robust: bool = False, legacy: bool = False, verbose: bool = True, *, key: str = "pylimma", ) -> MArrayLM: """Test for differential exon usage from an exon-level fit. Port of R limma's ``diffSplice.MArrayLM`` (Gordon Smyth and Charity Law, 2013-2025). The input ``fit`` must be an exon-resolution fit - either an :class:`MArrayLM` / dict, or an :class:`AnnData` whose ``adata.uns[key]`` holds the fit (as produced by ``pylimma.lm_fit(adata_exon, ...)``). Gene and exon identifiers are supplied via ``geneid`` and ``exonid`` (column names of ``fit['genes']``, or row-aligned arrays). For AnnData-stored fits, ``geneid`` / ``exonid`` may also be column names of ``adata.var`` since ``lm_fit`` propagates ``adata.var`` into ``fit['genes']`` via ``get_eawp``. The splice output has a different row count (genes with a single exon are dropped) from the input and carries a mix of per-exon and per-gene slots; it therefore isn't written back to the AnnData. The returned :class:`MArrayLM` is the input to :func:`top_splice` / :func:`plot_splice`. Parameters ---------- fit : AnnData, MArrayLM, or dict Exon-level fit. geneid, exonid, robust, legacy, verbose : See R's ``diffSplice``. key : str, default "pylimma" AnnData ``adata.uns`` key holding the fit. """ # Accept AnnData-stored fits (same contract as top_table / # decide_tests / genas). For MArrayLM / dict input this is a no-op. fit, _adata, _adata_key = _resolve_fit_input(fit, key) if not isinstance(fit, dict): raise ValueError("fit must be an MArrayLM, dict, or AnnData") # For AnnData input, prefer adata.var as the exon-annotation table # so string lookups like ``geneid="gene"`` find ``adata.var["gene"]`` # directly. fit["genes"] is a list of var_names on the AnnData path # (per the no-duplication decision for #2 in the audit), which would # otherwise fail a column-name lookup. n_exons = np.asarray(fit["coefficients"]).shape[0] if _adata is not None and _adata.var is not None and len(_adata.var.columns): exon_genes = _adata.var.copy() else: exon_genes = fit.get("genes") if exon_genes is None: exon_genes = pd.DataFrame({"ExonID": np.arange(1, n_exons + 1)}) else: exon_genes = ( exon_genes.copy() if isinstance(exon_genes, pd.DataFrame) else pd.DataFrame(exon_genes).copy() ) # Extract geneid and exonid if isinstance(geneid, str): genecolname = geneid geneid_arr = exon_genes[genecolname].values else: geneid_arr = np.asarray(geneid) exon_genes["GeneID"] = geneid_arr genecolname = "GeneID" if exonid is None: exoncolname = None exonid_arr = None else: if isinstance(exonid, str): exoncolname = exonid exonid_arr = exon_genes[exoncolname].values else: exonid_arr = np.asarray(exonid) exon_genes["ExonID"] = exonid_arr exoncolname = "ExonID" # Treat NA geneids as genes with one exon each geneid_arr = np.array([str(v) for v in geneid_arr.tolist()], dtype=object) isna_mask = pd.isna(geneid_arr) | (geneid_arr == "nan") | (geneid_arr == "None") if isna_mask.any(): na_idx = np.where(isna_mask)[0] geneid_arr = geneid_arr.copy() for k, idx in enumerate(na_idx): geneid_arr[idx] = f"NA{k + 1}" # Sort by (geneid, exonid). np.lexsort sorts by LAST key as primary, # so pass (exonid, geneid) to sort primarily by geneid. if exonid_arr is None: order = np.argsort(geneid_arr, kind="stable") else: exonid_str = np.array([str(v) for v in exonid_arr.tolist()]) order = np.lexsort((exonid_str, geneid_arr)) geneid_arr = geneid_arr[order] exon_genes = exon_genes.iloc[order].reset_index(drop=True) coefficients = np.asarray(fit["coefficients"], dtype=np.float64) if coefficients.ndim == 1: coefficients = coefficients.reshape(-1, 1) coefficients = coefficients[order, :] stdev_unscaled = np.asarray(fit["stdev_unscaled"], dtype=np.float64) if stdev_unscaled.ndim == 1: stdev_unscaled = stdev_unscaled.reshape(-1, 1) stdev_unscaled = stdev_unscaled[order, :] df_residual = np.asarray(fit["df_residual"], dtype=np.float64) if df_residual.ndim == 0: df_residual = np.full(n_exons, float(df_residual)) df_residual = df_residual[order] sigma = np.asarray(fit["sigma"], dtype=np.float64) exon_s2 = sigma[order] ** 2 # Genes with zero df get s2 = 0 (match R line 50) if df_residual.min() < 1e-6: exon_s2 = exon_s2.copy() exon_s2[df_residual < 1e-6] = 0.0 # Build group codes in order-of-first-appearance (reorder=FALSE). unique_genes, first_idx = np.unique(geneid_arr, return_index=True) # Sort unique by first_idx to preserve order-of-first-appearance order_first = np.argsort(first_idx, kind="stable") unique_genes_ord = unique_genes[order_first] gene_to_code = {g: i for i, g in enumerate(unique_genes_ord.tolist())} geneid_codes = np.array([gene_to_code[g] for g in geneid_arr.tolist()], dtype=np.int64) n_genes_total = len(unique_genes_ord) # Count exons per gene, sum df.residual, sum df.residual * s2 gene_nexons = _rowsum_sorted(np.ones(n_exons), geneid_codes, n_genes_total) gene_df_residual = _rowsum_sorted(df_residual, geneid_codes, n_genes_total) gene_df_res_s2 = _rowsum_sorted(df_residual * exon_s2, geneid_codes, n_genes_total) # Pooled gene variance with np.errstate(invalid="ignore", divide="ignore"): gene_s2 = np.where(gene_df_residual > 0, gene_df_res_s2 / gene_df_residual, 0.0) if verbose: one_exon = int(np.sum(gene_nexons == 1)) warnings.warn( f"diff_splice: {n_exons} exons, " f"{n_genes_total} genes, " f"{one_exon} with 1 exon, " f"mean {round(float(gene_nexons.mean()))} exons/gene, " f"max {int(gene_nexons.max())}", UserWarning, stacklevel=2, ) # R diffSplice defaults to ``legacy=FALSE``; pylimma.squeeze_var # auto-picks legacy=True for all-equal df, so forward the flag # explicitly to match R in both branches. squeeze = squeeze_var(var=gene_s2, df=gene_df_residual, robust=robust, legacy=legacy) # Keep only genes with > 1 exon gene_keep = gene_nexons > 1 ngenes = int(gene_keep.sum()) if ngenes == 0: raise ValueError("No genes with more than one exon") # Repeat gene_keep across each gene's exons. Because exons are sorted # by gene (order-of-first-appearance), we can tile gene_keep by # gene_nexons. exon_keep = np.repeat(gene_keep, gene_nexons.astype(np.int64)) geneid_arr = geneid_arr[exon_keep] exon_genes = exon_genes.iloc[exon_keep].reset_index(drop=True) coefficients = coefficients[exon_keep, :] stdev_unscaled = stdev_unscaled[exon_keep, :] df_residual = df_residual[exon_keep] gene_nexons_kept = gene_nexons[gene_keep] gene_df_test = gene_nexons_kept - 1 gene_df_residual_kept = gene_df_residual[gene_keep] df_prior = squeeze["df_prior"] df_prior_arr = np.atleast_1d(np.asarray(df_prior, dtype=np.float64)) if df_prior_arr.size > 1: df_prior_arr = df_prior_arr[gene_keep] gene_df_total = gene_df_residual_kept + df_prior_arr # Cap df_total at sum of residual dfs gene_df_total = np.minimum(gene_df_total, float(gene_df_residual_kept.sum())) var_post = np.asarray(squeeze["var_post"]) gene_s2_post = var_post[gene_keep] # Rebuild group codes on kept exons unique_genes_kept = unique_genes_ord[gene_keep] kept_gene_to_code = {g: i for i, g in enumerate(unique_genes_kept.tolist())} geneid_codes_kept = np.array( [kept_gene_to_code[g] for g in geneid_arr.tolist()], dtype=np.int64 ) # Per-gene weighted mean beta u2 = 1.0 / (stdev_unscaled**2) u2_rowsum = _rowsum_sorted(u2, geneid_codes_kept, ngenes) gene_betabar = _rowsum_sorted(coefficients * u2, geneid_codes_kept, ngenes) / u2_rowsum # t-statistics and F-statistics g = geneid_codes_kept coefficients_centred = coefficients - gene_betabar[g, :] exon_t = coefficients_centred / stdev_unscaled / np.sqrt(gene_s2_post[g])[:, None] exon_t_sq = exon_t**2 gene_F = _rowsum_sorted(exon_t_sq, geneid_codes_kept, ngenes) / gene_df_test[:, None] exon_1m_leverage = 1.0 - (u2 / u2_rowsum[g, :]) coefficients_final = coefficients_centred / exon_1m_leverage exon_t_final = exon_t / np.sqrt(exon_1m_leverage) with np.errstate(invalid="ignore"): gene_df_total_tiled = gene_df_total[g][:, None] # Broadcast exon_t_final (n_exons, n_coefs) vs df (n_exons, 1). exon_p_value = 2.0 * t_dist.sf(np.abs(exon_t_final), df=gene_df_total_tiled) gene_F_p_value = f_dist.sf(gene_F, dfn=gene_df_test[:, None], dfd=gene_df_total[:, None]) # Assemble exon-level output out = MArrayLM() out["genes"] = exon_genes out["genecolname"] = genecolname out["exoncolname"] = exoncolname out["coefficients"] = coefficients_final out["t"] = exon_t_final out["p_value"] = exon_p_value # Gene-level output out["gene_df_prior"] = df_prior_arr out["gene_df_residual"] = gene_df_residual_kept out["gene_df_total"] = gene_df_total out["gene_s2"] = gene_s2[gene_keep] out["gene_s2_post"] = gene_s2_post out["gene_F"] = gene_F out["gene_F_p_value"] = gene_F_p_value # Identify gene-level annotation columns (those constant within each # gene). First and last exon indices (0-based). gene_lastexon = np.cumsum(gene_nexons_kept.astype(np.int64)) - 1 gene_firstexon = gene_lastexon - gene_nexons_kept.astype(np.int64) + 1 # Per-column: is this column constant within each gene? gene_genes = exon_genes.iloc[gene_lastexon].reset_index(drop=True) is_gene_level = [] for col in exon_genes.columns: col_vals = exon_genes[col].values constant = True # Check that within each gene, all values equal the last-exon value for gi in range(ngenes): first = int(gene_firstexon[gi]) last = int(gene_lastexon[gi]) if last > first: ref = col_vals[last] for k in range(first, last): a = col_vals[k] b = ref # NaN-aware equality if pd.isna(a) and pd.isna(b): continue if a != b: constant = False break if not constant: break is_gene_level.append(constant) gene_level_cols = [c for c, keep in zip(exon_genes.columns, is_gene_level) if keep] gene_genes = gene_genes[gene_level_cols].copy() gene_genes.index = gene_genes[genecolname].astype(str).values gene_genes["NExons"] = gene_nexons_kept.astype(np.int64) out["gene_genes"] = gene_genes out["gene_firstexon"] = gene_firstexon out["gene_lastexon"] = gene_lastexon # Simes adjustment - for each coefficient column independently n_coefs = coefficients_final.shape[1] gene_simes = gene_F_p_value.copy() # Build the penalty vector (R lines 138-141): # penalty[i] = (within-gene rank) starting at 1; then inverse-ranked # by gene size: penalty = n_in_gene / within_gene_rank within_gene_rank = np.zeros(len(g), dtype=np.int64) for gi in range(ngenes): first = int(gene_firstexon[gi]) last = int(gene_lastexon[gi]) within_gene_rank[first : last + 1] = np.arange(1, last - first + 2) gene_nexons_per_exon = gene_nexons_kept[g].astype(np.float64) penalty = gene_nexons_per_exon / within_gene_rank for j in range(n_coefs): # Sort exons within gene by p-value ascending; multiply by penalty # in rank order; take min-adjusted p per gene. pj = exon_p_value[:, j] # lexsort: primary = g (gene), secondary = pj (within-gene p) # np.lexsort sorts by LAST key as primary. order_p = np.lexsort((pj, g)) p_ordered = pj[order_p] adj = p_ordered * penalty # Now take the minimum adjusted p per gene. Within each gene in # the sorted order, the minimum is the first since penalty is # monotonically non-increasing? Not quite - penalty = n / rank, # rank 1..n so penalty is DECREASING. p_ordered is INCREASING. # The product is not monotone. R's logic: # o <- order(g, p.adj) # out$gene.simes.p.value[,j] <- p.adj[o][gene.firstexon] # i.e., re-sort by (gene, adj) and take the first (smallest) per # gene. Mirror that. # We need the positions in the ORIGINAL (kept) exon ordering. # After order_p, group positions = g[order_p]. Re-sort by # (g_sorted, adj) and take per-gene minimum. g_sorted = g[order_p] order2 = np.lexsort((adj, g_sorted)) adj_final = adj[order2] # Per-gene first position in this double-sorted order g_double = g_sorted[order2] first_positions = np.zeros(ngenes, dtype=np.int64) # Walk through g_double to find first occurrence of each gene. # g_double is not sorted (g_sorted was sorted first, then ties # broken by adj). But gene groups are still contiguous because # we sorted on (g_sorted, adj). cur_gene = -1 for k in range(len(g_double)): if g_double[k] != cur_gene: first_positions[g_double[k]] = k cur_gene = int(g_double[k]) gene_simes[:, j] = adj_final[first_positions] out["gene_simes_p_value"] = gene_simes # Bonferroni adjustment gene_bonferroni = gene_F_p_value.copy() for j in range(n_coefs): pj = exon_p_value[:, j] order_p = np.lexsort((pj, g)) p_ordered = pj[order_p] # For each gene, take the first (smallest) p-value and multiply # by its n_exons. Bonferroni cap at 1. adj_bon = np.minimum( p_ordered[gene_firstexon] * gene_nexons_kept.astype(np.float64), 1.0, ) gene_bonferroni[:, j] = adj_bon out["gene_bonferroni_p_value"] = gene_bonferroni return out
[docs] def top_splice( fit, coef: int = -1, test: str = "F", number: int | float = 10, fdr: float = 1.0, sort_by: str = "p", ) -> pd.DataFrame: """Top-ranked splicing results from a ``diff_splice`` fit. Port of R limma's ``topSplice``. ``number=np.inf`` returns all rows. """ if fit.get("gene_genes") is None or "NExons" not in fit["gene_genes"].columns: raise ValueError("fit should be a fit object produced by diff_splice") # Normalise test test_low = test.lower() if test_low == "f": test_upper = "F" elif test_low == "simes": test_upper = "simes" elif test_low == "t": test_upper = "t" else: raise ValueError(f"test must be 'simes', 'F', or 't', got {test!r}") if sort_by not in ("p", "none", "logFC", "NExons"): raise ValueError(f"sort_by must be 'p', 'none', 'logFC', or 'NExons', got {sort_by!r}") if sort_by == "logFC" and test_upper != "t": raise ValueError("Sorting by logFC only available with t test") if sort_by == "NExons" and test_upper == "t": raise ValueError("Sorting by NExons only available with gene-level tests") # Resolve coef. R default: ncol(fit), i.e. the last column (1-based # ncol = last column). In pylimma 0-based convention, coef=-1 maps # to last column. n_coefs = np.asarray(fit["coefficients"]).shape[1] if coef == -1: c = n_coefs - 1 elif isinstance(coef, str): names_list = fit.get("contrast_names") or fit.get("coef_names") if names_list is None or coef not in list(names_list): raise ValueError(f"coef {coef!r} not found") c = list(names_list).index(coef) else: c = int(coef) if test_upper == "t": out = fit["genes"].copy() out["logFC"] = np.asarray(fit["coefficients"])[:, c] out["t"] = np.asarray(fit["t"])[:, c] out["P.Value"] = np.asarray(fit["p_value"])[:, c] elif test_upper == "F": out = fit["gene_genes"].copy() out["F"] = np.asarray(fit["gene_F"])[:, c] out["P.Value"] = np.asarray(fit["gene_F_p_value"])[:, c] else: # simes out = fit["gene_genes"].copy() out["P.Value"] = np.asarray(fit["gene_simes_p_value"])[:, c] out["FDR"] = p_adjust(out["P.Value"].values, method="BH") if fdr < 1.0: out = out[out["FDR"] <= fdr] number = min(int(number) if number != np.inf else len(out), len(out)) if number <= 1: return out if sort_by == "p": o = np.argsort(out["P.Value"].values, kind="stable") elif sort_by == "logFC": o = np.argsort(-np.abs(out["logFC"].values), kind="stable") elif sort_by == "NExons": # R: order(out$NExons, -out$P.Value, decreasing=TRUE) # decreasing=TRUE applies to BOTH keys via negation. # Primary: -NExons ascending = NExons descending # Secondary: -(-P.Value) ascending = P.Value ascending o = np.lexsort((out["P.Value"].values, -out["NExons"].values.astype(np.float64))) else: # none o = np.arange(len(out)) o = o[:number] return out.iloc[o].reset_index(drop=True)
[docs] def plot_splice( fit, coef: int = -1, geneid: str | None = None, genecolname: str | None = None, rank: int = 1, fdr: float = 0.05, xlab: str = "Exon", ax=None, **kwargs, ): """Plot exons or isoforms of a chosen gene. Port of R limma's ``plotSplice``. """ from .plotting import _require_matplotlib plt = _require_matplotlib() if genecolname is None: genecolname = fit.get("genecolname") else: genecolname = str(genecolname) n_coefs = np.asarray(fit["coefficients"]).shape[1] c = n_coefs - 1 if coef == -1 else int(coef) gene_F_p = np.asarray(fit["gene_F_p_value"]) gene_genes = fit["gene_genes"] if geneid is None: if rank == 1: i = int(np.argmin(gene_F_p[:, c])) else: i = int(np.argsort(gene_F_p[:, c], kind="stable")[rank - 1]) geneid = str(gene_genes.iloc[i][genecolname]) else: geneid = str(geneid) matches = np.where(gene_genes[genecolname].astype(str).values == geneid)[0] if len(matches) == 0: raise ValueError(f"geneid {geneid} not found") i = int(matches[0]) first = int(fit["gene_firstexon"][i]) last = int(fit["gene_lastexon"][i]) j = np.arange(first, last + 1) exoncolname = fit.get("exoncolname") ylab = f"logFC (this {xlab.lower()} vs rest)" coefs = np.asarray(fit["coefficients"])[j, c] if ax is None: _, ax = plt.subplots() ax.plot(np.arange(1, len(j) + 1), coefs, "o-", color="black") ax.set_ylabel(ylab) ax.set_title(geneid) if exoncolname is None: ax.set_xlabel(xlab) else: exon_ids = fit["genes"].iloc[j][exoncolname].astype(str).values ax.set_xticks(np.arange(1, len(j) + 1)) ax.set_xticklabels(exon_ids, rotation=90, fontsize=6) ax.set_xlabel(f"{xlab} {exoncolname}") # Mark significant exons top = top_splice(fit, coef=c, number=np.inf, test="t", sort_by="none") gene_label = gene_genes.iloc[i][genecolname] mask = top[genecolname].astype(str).values == str(gene_label) fdr_vals = top["FDR"].values[mask] sig = fdr_vals < fdr if np.any(sig): fdr_sig = fdr_vals[sig] if len(np.unique(fdr_sig)) == 1: cex = 1.5 else: abs_fdr = np.abs(np.log10(fdr_sig)) lo, hi = abs_fdr.min(), abs_fdr.max() cex = (abs_fdr - lo) / (hi - lo) * 1.0 + 1.0 x_positions = np.arange(1, len(j) + 1)[sig] y_positions = coefs[sig] sizes = np.atleast_1d(cex) * 25 ax.scatter( x_positions, y_positions, c="red", s=sizes if len(sizes) == len(x_positions) else sizes[0], marker="o", zorder=5, ) ax.axhline(0.0, linestyle="--", color="gray") return ax