# SPDX-License-Identifier: GPL-3.0-or-later
#
# This module is a Python port of code from R limma. Original R copyrights:
# R/goana.R Copyright (C) 2014-2022 Gordon Smyth, Yifang Hu
# R/kegga.R Copyright (C) 2015-2022 Gordon Smyth, Yifang Hu
# R/goanaTrend.R Copyright (C) 2022 Gordon Smyth
# Python port: Copyright (C) 2026 John Mulvey
"""
Gene-ontology and KEGG pathway over-representation analysis.
Public API
----------
goana, top_go Gene-ontology over-representation, port of R
limma's ``goana`` and ``topGO``.
kegga, top_kegg KEGG pathway over-representation, port of R
limma's ``kegga`` and ``topKEGG``.
goana_trend Per-gene DE-probability estimate from a covariate
(used internally for length / abundance bias
correction), port of R limma's ``goanaTrend``.
Phase-1 scope
-------------
This first-cut port focuses on the database-free path. Two cuts versus
R limma:
1. **Bioconductor database lookups dropped.** R limma's ``goana.default``
reaches into ``GO.db`` / ``org.<species>.eg.db``; ``kegga.default``
reaches into the live KEGG REST API. pylimma deliberately does not
wrap Bioconductor annotation databases (see
``policy_data_class_wrappers``). Both ports require the caller to
supply a ``gene_pathway`` data frame mapping gene IDs to pathway IDs
(the universal interface, fully database-free). The
``getGeneKEGGLinks`` and ``getKEGGPathwayNames`` helpers, which exist
in R only to fetch from the KEGG REST API, are not ported.
2. **BiasedUrn deferred.** R's ``trend=TRUE`` path performs Wallenius'
noncentral hypergeometric test via ``BiasedUrn::pWNCHypergeo`` /
``dWNCHypergeo`` for length / abundance bias correction. BiasedUrn
is GPL-2-or-later (compatible) but a substantial side-quest and is
staged to a separate Phase 2. Phase 1 raises
``NotImplementedError`` whenever ``trend`` is truthy or a covariate
is supplied to a ``*.default`` method - silently falling back to
plain hypergeometric would be wrong-but-plausible-looking.
The plain-hypergeometric branch (``trend=False``) ports verbatim and is
numerically validated against R limma fixtures.
gene_pathway format
-------------------
A pandas ``DataFrame`` (or anything ``DataFrame(...)`` accepts) with at
least two columns:
* column 0 - gene id
* column 1 - pathway id (GO term, KEGG pathway, ...)
* column 2 *(optional, goana only)* - GO ontology, one of "BP" / "CC"
/ "MF". Surfaces in the output's ``ontology`` column and lets
``top_go`` filter by ontology.
* column 3 *(optional, goana only)* - human-readable term name.
Surfaces in the output's ``term`` column. Without this column the
``term`` column repeats the pathway id verbatim, since pylimma has
no GO.db lookup to fall back to.
"""
from __future__ import annotations
import warnings
import numpy as np
import pandas as pd
from scipy import stats
from .classes import MArrayLM, _is_anndata, _resolve_fit_input
from .utils import p_adjust, tricube_moving_average
# ---------------------------------------------------------------------------
# Public dispatchers
# ---------------------------------------------------------------------------
[docs]
def goana(
de,
gene_pathway,
universe=None,
*,
species=None,
prior_prob=None,
covariate=None,
plot=False,
fdr=0.05,
trend=False,
**kwargs,
) -> pd.DataFrame:
"""
Gene-ontology over-representation analysis.
Port of R limma's ``goana`` (``goana.R``). See module docstring for
the gene_pathway format and the Phase-1 scope cuts.
Parameters
----------
de : MArrayLM, AnnData, dict-of-vectors, or 1-D vector of gene IDs
Differentially-expressed genes. When a fit object is supplied,
the up- and down-regulated gene sets are derived internally
(BH adjustment at level ``fdr``).
gene_pathway : DataFrame
Mapping from gene IDs to GO terms. See module docstring.
universe : sequence of str, optional
Background gene universe. Defaults to all gene IDs in
``gene_pathway``. Required - without GO.db there is no
meaningful default beyond the supplied gene_pathway.
species : str, optional
Accepted for R-API compatibility; ignored without GO.db.
prior_prob : sequence of float, optional
Per-gene null DE probability (R name: ``null.prob``).
covariate : sequence of float, optional
Covariate values aligned to the universe; if supplied,
``trend`` becomes implicit and ``goana_trend`` is run.
Phase 1 raises ``NotImplementedError``.
plot : bool, default False
Forwarded to ``goana_trend`` when applicable.
fdr : float, default 0.05
FDR threshold for selecting up/down DE genes from a fit object
(R name: ``FDR``). Only used in the MArrayLM branch.
trend : bool, numeric, or str, default False
Use length / abundance bias correction. Phase 1 raises
``NotImplementedError`` for any truthy / numeric / character
value.
**kwargs
``coef`` and ``geneid`` for the MArrayLM branch.
"""
if _is_dispatch_marraylm(de):
return _goana_marraylm(
de,
gene_pathway=gene_pathway,
universe=universe,
species=species,
prior_prob=prior_prob,
covariate=covariate,
plot=plot,
fdr=fdr,
trend=trend,
**kwargs,
)
return _goana_default(
de,
gene_pathway=gene_pathway,
universe=universe,
species=species,
prior_prob=prior_prob,
covariate=covariate,
plot=plot,
trend=trend,
)
[docs]
def kegga(
de,
gene_pathway,
pathway_names=None,
universe=None,
*,
species=None,
prior_prob=None,
covariate=None,
plot=False,
fdr=0.05,
trend=False,
**kwargs,
) -> pd.DataFrame:
"""
KEGG pathway over-representation analysis.
Port of R limma's ``kegga`` (``kegga.R``). See module docstring for
the gene_pathway format and the Phase-1 scope cuts.
All parameters except ``pathway_names`` have identical semantics to
:func:`goana`; see that function's docstring for full descriptions.
Parameters
----------
pathway_names : DataFrame, optional
Two-column mapping from pathway id (column 0) to human-readable
description (column 1). If absent, the output's ``pathway``
column repeats the pathway id verbatim.
"""
if _is_dispatch_marraylm(de):
return _kegga_marraylm(
de,
gene_pathway=gene_pathway,
pathway_names=pathway_names,
universe=universe,
species=species,
prior_prob=prior_prob,
covariate=covariate,
plot=plot,
fdr=fdr,
trend=trend,
**kwargs,
)
return _kegga_default(
de,
gene_pathway=gene_pathway,
pathway_names=pathway_names,
universe=universe,
species=species,
prior_prob=prior_prob,
covariate=covariate,
plot=plot,
trend=trend,
)
def _is_dispatch_marraylm(de) -> bool:
if _is_anndata(de):
return True
if isinstance(de, MArrayLM):
return True
if isinstance(de, dict) and "coefficients" in de and "p_value" in de:
return True
return False
# ---------------------------------------------------------------------------
# MArrayLM dispatch helpers (port of goana.MArrayLM, kegga.MArrayLM)
# ---------------------------------------------------------------------------
def _select_de_from_fit(fit, geneid, coef, fdr) -> tuple[list[str], list[str], list[str]]:
"""Return (universe, up_ids, down_ids) from an MArrayLM-shaped fit.
Mirrors lines 13-77 of goana.R / kegga.R: validate the fit, resolve
geneid (vector vs. column-name reference), select FDR-significant
up/down genes via BH at level ``fdr``.
"""
if fit.get("coefficients") is None:
raise ValueError("de does not appear to be a valid MArrayLM fit object.")
if fit.get("p_value") is None:
raise ValueError("p.value not found in fit object, perhaps need to run e_bayes first.")
coefs = np.asarray(fit["coefficients"])
if coefs.ndim == 1:
coefs = coefs.reshape(-1, 1)
pvals = np.asarray(fit["p_value"])
if pvals.ndim == 1:
pvals = pvals.reshape(-1, 1)
ngenes, ncoef = coefs.shape
if coef is None:
coef = ncoef - 1
if hasattr(coef, "__len__") and len(coef) != 1:
raise ValueError("Only one coef can be specified.")
if hasattr(coef, "__len__"):
coef = coef[0]
if isinstance(coef, str):
coef_names = list(fit.get("coef_names") or [])
if not coef_names:
# Try column index fallback via design.
design = fit.get("design")
if design is not None and hasattr(design, "columns"):
coef_names = list(design.columns)
if coef not in coef_names:
raise ValueError(f"coef {coef!r} not found in fit")
coef_idx = coef_names.index(coef)
else:
coef_idx = int(coef)
if coef_idx < 0 or coef_idx >= ncoef:
raise ValueError(f"coef index {coef_idx} out of range")
# Resolve geneid: either a length-ngenes vector or a single column name
# in fit['genes'].
if geneid is None:
genes = fit.get("genes")
if genes is not None and hasattr(genes, "index"):
universe = [str(v) for v in genes.index]
else:
universe = [str(i) for i in range(ngenes)]
elif isinstance(geneid, str) or (
hasattr(geneid, "__len__")
and len(geneid) == 1
and not isinstance(geneid, (np.ndarray, pd.Series))
):
col = geneid if isinstance(geneid, str) else geneid[0]
genes = fit.get("genes")
if genes is None or col not in getattr(genes, "columns", []):
raise ValueError(f"Column {col} not found in de$genes")
universe = [str(v) for v in genes[col].values]
else:
gid_arr = np.asarray(geneid).ravel()
if gid_arr.size != ngenes:
raise ValueError("geneid of incorrect length")
universe = [str(v) for v in gid_arr]
if not isinstance(fdr, (int, float, np.integer, np.floating)):
raise ValueError("FDR must be numeric and of length 1.")
fdr = float(fdr)
if fdr < 0 or fdr > 1:
raise ValueError("FDR should be between 0 and 1.")
pvec = pvals[:, coef_idx]
cvec = coefs[:, coef_idx]
# Drop rows where geneid is NA (R's anyNA branch).
isna = np.array(
[
v is None
or (isinstance(v, float) and np.isnan(v))
or v == ""
or v == "nan"
or v == "NA"
for v in universe
]
)
if isna.all():
raise ValueError("Gene IDs are all NA")
if isna.any():
keep = ~isna
universe = [u for u, k in zip(universe, keep) if k]
pvec = pvec[keep]
cvec = cvec[keep]
fdr_coef = p_adjust(pvec, method="BH")
up_mask = (fdr_coef < fdr) & (cvec > 0)
dn_mask = (fdr_coef < fdr) & (cvec < 0)
up = [u for u, m in zip(universe, up_mask) if m]
dn = [u for u, m in zip(universe, dn_mask) if m]
return universe, up, dn
def _goana_marraylm(
de, *, gene_pathway, universe, species, prior_prob, covariate, plot, fdr, trend, **kwargs
):
"""Port of goana.MArrayLM (goana.R:3-86)."""
fit, _adata, _adata_key = _resolve_fit_input(de, kwargs.pop("key", "fit"))
if "universe" in kwargs:
raise ValueError("goana.MArrayLM defines its own universe")
if (not isinstance(trend, bool) or trend) and "covariate" in kwargs:
raise ValueError("goana.MArrayLM defines its own covariate")
geneid = kwargs.pop("geneid", None)
coef = kwargs.pop("coef", None)
if kwargs:
raise TypeError(f"unexpected keyword arguments: {sorted(kwargs)}")
if not isinstance(trend, bool):
# R accepts numeric or character trend; both go through the
# BiasedUrn path which is deferred.
raise NotImplementedError(
"trend correction requires the BiasedUrn port; see pylimma roadmap."
)
fit_universe, up, dn = _select_de_from_fit(fit, geneid, coef, fdr)
if not up and not dn:
warnings.warn("No DE genes")
return pd.DataFrame()
return _goana_default(
{"Up": up, "Down": dn},
gene_pathway=gene_pathway,
universe=fit_universe,
species=species,
prior_prob=prior_prob,
covariate=covariate,
plot=plot,
trend=trend,
)
def _kegga_marraylm(
de,
*,
gene_pathway,
pathway_names,
universe,
species,
prior_prob,
covariate,
plot,
fdr,
trend,
**kwargs,
):
"""Port of kegga.MArrayLM (kegga.R:3-86)."""
fit, _adata, _adata_key = _resolve_fit_input(de, kwargs.pop("key", "fit"))
if "universe" in kwargs:
raise ValueError("kegga.MArrayLM defines its own universe")
if (not isinstance(trend, bool) or trend) and "covariate" in kwargs:
raise ValueError("kegga.MArrayLM defines its own covariate")
geneid = kwargs.pop("geneid", None)
coef = kwargs.pop("coef", None)
if kwargs:
raise TypeError(f"unexpected keyword arguments: {sorted(kwargs)}")
if not isinstance(trend, bool):
raise NotImplementedError(
"trend correction requires the BiasedUrn port; see pylimma roadmap."
)
fit_universe, up, dn = _select_de_from_fit(fit, geneid, coef, fdr)
if not up and not dn:
warnings.warn("No DE genes")
return pd.DataFrame()
return _kegga_default(
{"Up": up, "Down": dn},
gene_pathway=gene_pathway,
pathway_names=pathway_names,
universe=fit_universe,
species=species,
prior_prob=prior_prob,
covariate=covariate,
plot=plot,
trend=trend,
)
# ---------------------------------------------------------------------------
# default-method helpers
# ---------------------------------------------------------------------------
def _ensure_de_lists(de) -> dict[str, np.ndarray]:
"""Port of the R 'ensure de is a list, dedupe, name uniquely' block.
Mirrors goana.R:160-179 and kegga.R:93-112.
"""
if isinstance(de, dict):
names = list(de.keys())
sets = list(de.values())
elif hasattr(de, "__iter__") and not isinstance(de, (str, bytes)):
names = ["DE"]
sets = [list(de)]
else:
raise ValueError("components of de should be vectors")
out: dict[str, np.ndarray] = {}
cleaned_names: list[str] = []
for nm, vec in zip(names, sets):
if isinstance(vec, pd.DataFrame):
raise ValueError(
"de should be a list of character vectors. It should not be a data.frame."
)
try:
arr = np.asarray(vec).ravel()
except Exception as e:
raise ValueError("components of de should be vectors") from e
# Unique-stable, R semantics: order of first appearance, cast to str.
seen = set()
uniq = []
for v in arr:
sv = str(v)
if sv not in seen:
seen.add(sv)
uniq.append(sv)
cleaned_names.append("" if nm is None else str(nm).strip())
out[nm] = np.array(uniq, dtype=object)
# Replace empty / NA-like names with DE<i> and disambiguate duplicates.
final_names: list[str] = []
for i, nm in enumerate(cleaned_names, start=1):
if nm == "" or nm.lower() == "nan":
final_names.append(f"DE{i}")
else:
final_names.append(nm)
seen_count: dict[str, int] = {}
for nm in final_names:
seen_count[nm] = seen_count.get(nm, 0) + 1
width: dict[str, int] = {
nm: 1 + int(np.floor(np.log10(c))) if c > 1 else 0 for nm, c in seen_count.items()
}
occ: dict[str, int] = {nm: 0 for nm in seen_count}
disambiguated: list[str] = []
for nm in final_names:
if seen_count[nm] > 1:
occ[nm] += 1
disambiguated.append(f"{nm}{occ[nm]:0{width[nm]}d}")
else:
disambiguated.append(nm)
keyed = {dn: out[on] for dn, on in zip(disambiguated, list(out.keys()))}
return keyed
def _normalise_universe(universe, prior_prob, covariate, gene_pathway_first_col):
"""Apply R's NA-removal, dedup, and (optional) restrict-to-pathway logic.
Mirrors goana.R:122-156 and kegga.R:155-202 for the explicit-universe
path (the implicit / database-built path doesn't apply here).
"""
universe = np.asarray(universe).astype(str).ravel()
lu = universe.size
if prior_prob is not None:
prior_prob = np.asarray(prior_prob, dtype=np.float64).ravel().copy()
if prior_prob.size != lu:
raise ValueError("universe and null.prob must have same length")
if covariate is not None:
covariate = np.asarray(covariate, dtype=np.float64).ravel().copy()
if covariate.size != lu:
raise ValueError("universe and covariate must have same length")
# NA filtering
isna = np.array([u in ("", "nan", "NA", "None") for u in universe]) | (universe == "")
# numpy strings compare equal to "nan" already; explicit check above is
# for object-dtype inputs.
if covariate is not None:
isna |= np.isnan(covariate)
if prior_prob is not None:
isna |= np.isnan(prior_prob)
if isna.all():
raise ValueError("Gene IDs are all NA")
if isna.any():
keep = ~isna
universe = universe[keep]
if covariate is not None:
covariate = covariate[keep]
if prior_prob is not None:
prior_prob = prior_prob[keep]
# Dedup universe (keep first occurrence). R takes !duplicated
# (so first occurrence is kept), and slices covariate / prior_prob by
# the same mask.
_, first_idx = np.unique(universe, return_index=True)
if first_idx.size != universe.size:
keep = np.zeros(universe.size, dtype=bool)
keep[first_idx] = True
universe = universe[keep]
if covariate is not None:
covariate = covariate[keep]
if prior_prob is not None:
prior_prob = prior_prob[keep]
return universe, prior_prob, covariate
def _normalise_gene_pathway(gene_pathway, *, optional_extra_cols=False):
"""Validate and dedupe a gene_pathway DataFrame.
Mirrors kegga.R:122-138 / goana.R:153-156:
- require 2 cols
- drop rows where either of the first two cols is NA
- drop duplicate (col1, col2) rows
Returns a fresh DataFrame keyed off the *first two* columns
(renamed gene_id, pathway_id) with optional extra columns
preserved when ``optional_extra_cols`` is True (used by goana to
carry ontology / term lookups through).
"""
if gene_pathway is None:
raise ValueError(
"gene_pathway is required; pylimma does not bundle GO.db / "
"KEGG REST lookups (see policy_data_class_wrappers)."
)
df = pd.DataFrame(gene_pathway).copy()
if df.shape[1] < 2:
raise ValueError("gene.pathway must have at least 2 columns")
cols = list(df.columns)
df = df.rename(columns={cols[0]: "gene_id", cols[1]: "pathway_id"})
df["gene_id"] = df["gene_id"].astype(str)
df["pathway_id"] = df["pathway_id"].astype(str)
# Drop rows where either of the first two cols is NA
isna = (
df[["gene_id", "pathway_id"]].isna().any(axis=1)
| (df["gene_id"].isin(["", "nan", "NA"]))
| (df["pathway_id"].isin(["", "nan", "NA"]))
)
df = df.loc[~isna].copy()
# Drop duplicate (gene_id, pathway_id) rows
df = df.drop_duplicates(subset=["gene_id", "pathway_id"], keep="first")
if not optional_extra_cols and df.shape[1] > 2:
df = df.iloc[:, :2]
return df
def _hypergeometric_pvalues(
S_counts: np.ndarray, S_N: np.ndarray, nde: np.ndarray, NGenes: int
) -> np.ndarray:
"""Vectorised port of the phyper(...,lower.tail=FALSE) loop.
R: ``PValue[,j] <- phyper(S[,1L+j]-0.5, nde[j], NGenes-nde[j], S[,N], lower.tail=FALSE)``
R's ``phyper(q, m, n, k, lower.tail=FALSE)`` is ``P(X > q)`` for a
hypergeometric draw of ``k`` from a pool of ``m`` successes and
``n`` failures. With ``q = S - 0.5`` and integer ``S``, this is
``P(X >= S)``. ``scipy.stats.hypergeom.sf(k, M, n, N)`` is
``P(X > k)``; we set ``k = S - 1`` so it computes ``P(X >= S)``.
Verified against R numerically.
"""
# S_counts: (n_pathways, n_de_lists)
# S_N: (n_pathways,)
# nde: (n_de_lists,)
npw, nsets = S_counts.shape
out = np.zeros_like(S_counts, dtype=np.float64)
for j in range(nsets):
# P(X >= s) where X ~ Hypergeom(M=NGenes, n_success=nde[j], N_sample=S_N[i])
out[:, j] = stats.hypergeom.sf(S_counts[:, j] - 1, NGenes, int(nde[j]), S_N)
return out
def _goana_default(de, *, gene_pathway, universe, species, prior_prob, covariate, plot, trend):
"""Port of goana.default (goana.R:88-260)."""
if not isinstance(trend, bool) or trend:
raise NotImplementedError(
"trend correction requires the BiasedUrn port; see pylimma roadmap."
)
if covariate is not None:
raise NotImplementedError(
"trend correction requires the BiasedUrn port; see pylimma roadmap."
)
# gene_pathway with optional ontology (col 3) + term (col 4) for
# the goana output.
gp = _normalise_gene_pathway(gene_pathway, optional_extra_cols=True)
has_ontology = gp.shape[1] >= 3
has_term = gp.shape[1] >= 4
ont_col = gp.columns[2] if has_ontology else None
term_col = gp.columns[3] if has_term else None
de_lists = _ensure_de_lists(de)
if universe is None:
universe = pd.unique(gp["gene_id"])
if prior_prob is not None or covariate is not None:
warnings.warn("Ignoring covariate and null.prob because universe not set")
prior_prob = None
covariate = None
universe, prior_prob, _covariate = _normalise_universe(
universe, prior_prob, covariate, gp["gene_id"]
)
NGenes = universe.size
if NGenes < 1:
raise ValueError("No annotated genes found in universe")
# Restrict DE genes to universe.
universe_set = set(universe.tolist())
for nm in list(de_lists.keys()):
de_lists[nm] = np.array([g for g in de_lists[nm] if g in universe_set], dtype=object)
# Restrict pathways to universe.
gp_in_universe = gp[gp["gene_id"].isin(universe_set)].reset_index(drop=True)
if gp_in_universe.empty:
raise ValueError("Pathways do not overlap with universe")
# Build incidence matrix in R's column order: N, then one column per
# DE list. ``rowsum(X, group, reorder=FALSE)`` in pandas is a groupby
# on the pathway id with sort=False to preserve first-occurrence
# order.
set_names = list(de_lists.keys())
columns = ["N"] + set_names
X = pd.DataFrame(0, index=gp_in_universe.index, columns=columns, dtype=np.int64)
X["N"] = 1
for nm in set_names:
members = set(de_lists[nm].tolist())
X[nm] = gp_in_universe["gene_id"].isin(members).astype(np.int64)
grouped = X.groupby(gp_in_universe["pathway_id"].values, sort=False).sum()
pathway_ids = grouped.index.to_numpy()
S_N = grouped["N"].to_numpy(dtype=np.int64)
S_counts = grouped[set_names].to_numpy(dtype=np.int64)
nde = np.array([de_lists[nm].size for nm in set_names], dtype=np.int64)
PValue = _hypergeometric_pvalues(S_counts, S_N, nde, NGenes)
# Term and ontology lookups: use the first occurrence of each
# pathway_id in the (possibly extended) gene_pathway table.
first_rows = gp.drop_duplicates(subset=["pathway_id"], keep="first").set_index("pathway_id")
term_lookup = first_rows[term_col] if has_term else None
ont_lookup = first_rows[ont_col] if has_ontology else None
out = pd.DataFrame(index=pathway_ids)
if has_term:
out["term"] = [term_lookup.get(p, p) for p in pathway_ids]
else:
out["term"] = list(pathway_ids)
if has_ontology:
out["ontology"] = [ont_lookup.get(p, np.nan) for p in pathway_ids]
else:
out["ontology"] = np.nan
out["n"] = S_N
for j, nm in enumerate(set_names):
out[nm.lower()] = S_counts[:, j]
for j, nm in enumerate(set_names):
out[f"p_{nm.lower()}"] = PValue[:, j]
return out
def _kegga_default(
de, *, gene_pathway, pathway_names, universe, species, prior_prob, covariate, plot, trend
):
"""Port of kegga.default (kegga.R:88-280)."""
if not isinstance(trend, bool) or trend:
raise NotImplementedError(
"trend correction requires the BiasedUrn port; see pylimma roadmap."
)
if covariate is not None:
raise NotImplementedError(
"trend correction requires the BiasedUrn port; see pylimma roadmap."
)
de_lists = _ensure_de_lists(de)
gp = _normalise_gene_pathway(gene_pathway, optional_extra_cols=False)
# Pathway names lookup
if pathway_names is not None:
pn = pd.DataFrame(pathway_names).copy()
if pn.shape[1] < 2:
raise ValueError("pathway.names must have at least 2 columns")
pn = pn.rename(columns={pn.columns[0]: "_path_id", pn.columns[1]: "_description"})
pn["_path_id"] = pn["_path_id"].astype(str)
pn["_description"] = pn["_description"].astype(str)
pn_isna = pn[["_path_id", "_description"]].isna().any(axis=1)
pn = pn.loc[~pn_isna].copy()
pn_lookup = dict(zip(pn["_path_id"], pn["_description"]))
else:
pn_lookup = None
if universe is None:
universe = pd.unique(gp["gene_id"])
if prior_prob is not None or covariate is not None:
warnings.warn("Ignoring covariate and null.prob because universe not set")
prior_prob = None
covariate = None
universe, prior_prob, _covariate = _normalise_universe(
universe, prior_prob, covariate, gp["gene_id"]
)
NGenes = universe.size
if NGenes < 1:
raise ValueError("No annotated genes found in universe")
universe_set = set(universe.tolist())
for nm in list(de_lists.keys()):
de_lists[nm] = np.array([g for g in de_lists[nm] if g in universe_set], dtype=object)
gp_in_universe = gp[gp["gene_id"].isin(universe_set)].reset_index(drop=True)
if gp_in_universe.empty:
raise ValueError("Pathways do not overlap with universe")
set_names = list(de_lists.keys())
columns = ["N"] + set_names
X = pd.DataFrame(0, index=gp_in_universe.index, columns=columns, dtype=np.int64)
X["N"] = 1
for nm in set_names:
members = set(de_lists[nm].tolist())
X[nm] = gp_in_universe["gene_id"].isin(members).astype(np.int64)
grouped = X.groupby(gp_in_universe["pathway_id"].values, sort=False).sum()
pathway_ids = grouped.index.to_numpy()
S_N = grouped["N"].to_numpy(dtype=np.int64)
S_counts = grouped[set_names].to_numpy(dtype=np.int64)
nde = np.array([de_lists[nm].size for nm in set_names], dtype=np.int64)
PValue = _hypergeometric_pvalues(S_counts, S_N, nde, NGenes)
out = pd.DataFrame(index=pathway_ids)
if pn_lookup is not None:
out["pathway"] = [pn_lookup.get(p, np.nan) for p in pathway_ids]
else:
out["pathway"] = list(pathway_ids)
out["n"] = S_N
for j, nm in enumerate(set_names):
out[nm.lower()] = S_counts[:, j]
for j, nm in enumerate(set_names):
out[f"p_{nm.lower()}"] = PValue[:, j]
return out
# ---------------------------------------------------------------------------
# top_go / top_kegg
# ---------------------------------------------------------------------------
[docs]
def top_go(
results, ontology=("BP", "CC", "MF"), sort=None, number=20, truncate_term=None, p_value=1.0
) -> pd.DataFrame:
"""
Extract the top GO terms from a :func:`goana` result.
Port of R limma's ``topGO``. ``results`` must be a DataFrame in the
shape returned by :func:`goana` (columns ``term``, ``ontology``,
``n``, then one count column per DE list, then one ``p_*`` column
per DE list).
"""
if not isinstance(results, pd.DataFrame):
raise ValueError("results should be a data.frame.")
valid_ont = {"BP", "CC", "MF"}
ontology_arr = list(dict.fromkeys(ontology))
for o in ontology_arr:
if o not in valid_ont:
raise ValueError(f"'arg' should be one of 'BP', 'CC', 'MF': got {o!r}")
if len(ontology_arr) < 3 and "ontology" in results.columns:
results = results.loc[results["ontology"].isin(ontology_arr)]
dimres = results.shape
if not isinstance(number, (int, np.integer, float, np.floating)):
raise ValueError("number should be a positive integer")
number = int(number)
if number > dimres[0]:
number = dimres[0]
if number < 1:
return results.iloc[0:0]
nsets = (dimres[1] - 3) // 2
if nsets < 1:
raise ValueError("results has wrong number of columns")
setnames = list(results.columns[3 : 3 + nsets])
if sort is None:
isort = list(range(nsets))
else:
sort_arr = [str(s).lower() for s in ([sort] if isinstance(sort, str) else list(sort))]
isort = [i for i, nm in enumerate(setnames) if nm.lower() in sort_arr]
if not isort:
raise ValueError("sort column not found in results")
p_cols = [results.columns[3 + nsets + i] for i in isort]
if len(p_cols) == 1:
P = results[p_cols[0]].to_numpy(dtype=np.float64)
else:
P = results[p_cols].min(axis=1).to_numpy(dtype=np.float64)
if p_value < 1:
number = min(number, int(np.sum(P <= p_value)))
if number < 1:
return results.iloc[0:0]
N_arr = results["n"].to_numpy()
Term_arr = results["term"].to_numpy()
# R: order(P, results$N, results$Term) - stable multi-key sort.
order = np.lexsort((Term_arr, N_arr, P))
keep = order[:number]
tab = results.iloc[keep].copy()
if truncate_term is not None:
tt = int(np.atleast_1d(truncate_term).ravel()[0])
tt = max(tt, 5)
tt = min(tt, 1000)
tm2 = tt - 3
new_terms = []
for v in tab["term"]:
sv = str(v) if v is not None else ""
if len(sv) > tm2:
sv = sv[:tm2] + "..."
new_terms.append(sv)
tab["term"] = new_terms
return tab
[docs]
def top_kegg(results, sort=None, number=20, truncate_path=None, p_value=1.0) -> pd.DataFrame:
"""
Extract the top KEGG pathways from a :func:`kegga` result.
Port of R limma's ``topKEGG``.
"""
if not isinstance(results, pd.DataFrame):
raise ValueError("results should be a data.frame.")
dimres = results.shape
if not isinstance(number, (int, np.integer, float, np.floating)):
raise ValueError("number should be a positive integer")
number = int(number)
if number > dimres[0]:
number = dimres[0]
if number < 1:
return results.iloc[0:0]
nsets = (dimres[1] - 2) // 2
if nsets < 1:
raise ValueError("results has wrong number of columns")
setnames = list(results.columns[2 : 2 + nsets])
if sort is None:
isort = list(range(nsets))
else:
sort_arr = [str(s).lower() for s in ([sort] if isinstance(sort, str) else list(sort))]
isort = [i for i, nm in enumerate(setnames) if nm.lower() in sort_arr]
if not isort:
raise ValueError("sort column not found in results")
p_cols = [results.columns[2 + nsets + i] for i in isort]
if len(p_cols) == 1:
P = results[p_cols[0]].to_numpy(dtype=np.float64)
else:
P = results[p_cols].min(axis=1).to_numpy(dtype=np.float64)
if p_value < 1:
number = min(number, int(np.sum(P <= p_value)))
if number < 1:
return results.iloc[0:0]
N_arr = results["n"].to_numpy()
Path_arr = results["pathway"].to_numpy()
order = np.lexsort((Path_arr, N_arr, P))
keep = order[:number]
tab = results.iloc[keep].copy()
if truncate_path is not None:
tt = int(np.atleast_1d(truncate_path).ravel()[0])
tt = max(tt, 5)
tt = min(tt, 1000)
tm2 = tt - 3
new_paths = []
for v in tab["pathway"]:
sv = str(v) if v is not None else ""
if len(sv) > tm2:
sv = sv[:tm2] + "..."
new_paths.append(sv)
tab["pathway"] = new_paths
return tab
# ---------------------------------------------------------------------------
# goana_trend (port of goanaTrend.R)
# ---------------------------------------------------------------------------
[docs]
def goana_trend(index_de, covariate, n_prior=10, plot=False) -> np.ndarray:
"""
Estimate per-gene DE probability from a covariate.
Port of R limma's ``goanaTrend`` (``goanaTrend.R``). Used internally
by :func:`goana` and :func:`kegga` for length / abundance bias
correction. Stand-alone use is supported and validated against R.
"""
index_de = np.asarray(index_de)
if index_de.dtype != bool and np.any(pd.isna(index_de)):
raise ValueError("index.de should not contain missing values")
covariate = np.asarray(covariate, dtype=np.float64).copy()
ngenes = covariate.size
if ngenes == 0:
return np.zeros(0, dtype=np.float64)
p = np.zeros(ngenes, dtype=np.float64)
isDE = np.zeros(ngenes, dtype=np.float64)
if index_de.dtype == bool:
if index_de.size != ngenes:
raise ValueError("index.de length must match covariate length")
isDE[index_de] = 1.0
else:
# 0-based integer indices.
idx_int = np.asarray(index_de, dtype=np.int64).ravel()
isDE[idx_int] = 1.0
nDE = isDE.sum()
p_mean = max(nDE / ngenes, 1e-5)
# NA branch in covariate.
if np.any(np.isnan(covariate)):
isna = np.isnan(covariate)
p[isna] = p_mean
sub_cov = covariate[~isna]
sub_isde = isDE[~isna].astype(bool)
# Recurse with bool index aligned to the sub-vector.
p[~isna] = goana_trend(sub_isde, sub_cov, n_prior=n_prior, plot=plot)
return p
# R: o <- order(covariate). Stable for ties (ascending).
o = np.argsort(covariate, kind="stable")
isDE_o = isDE[o]
# span = approx(c(20,200), c(1,0.5), xout=nDE, rule=2, ties=...)$y
# rule=2 -> clamp at boundaries, NOT linear extrapolation.
if nDE <= 20:
span = 1.0
elif nDE >= 200:
span = 0.5
else:
# Linear interpolation between (20, 1) and (200, 0.5).
span = 1.0 + (0.5 - 1.0) * (nDE - 20) / (200 - 20)
p_o = tricube_moving_average(isDE_o, span=span)
p_o = (p_mean * n_prior + p_o * nDE) / (n_prior + nDE)
p[o] = p_o
return p