# SPDX-License-Identifier: GPL-3.0-or-later
#
# This module is a Python port of code from R limma. Original R copyrights:
# geneset-ids2indices.R Copyright (C) 2009-2015 Gordon Smyth,
# Yifang Hu
# geneset-roast.R Copyright (C) 2008-2020 Gordon Smyth,
# Di Wu
# geneset-fry.R Copyright (C) 2015-2020 Gordon Smyth,
# Goknur Giner
# geneset-camera.R Copyright (C) 2007-2025 Gordon Smyth,
# Di Wu
# geneset-cameraPR.R Copyright (C) 2017-2025 Gordon Smyth
# geneset-romer.R Copyright (C) 2009-2015 Gordon Smyth,
# Yifang Hu
# geneset-wilcox.R Copyright (C) 2004-2012 Gordon Smyth
# rankSumTestWithCorrelation.R Copyright (C) 2007-2012 Gordon Smyth,
# Di Wu
# lmEffects.R Copyright (C) 2016-2020 Gordon Smyth
# Python port: Copyright (C) 2026 John Mulvey
"""
Gene-set testing routines for pylimma.
Faithful ports of R limma's competitive and self-contained gene-set tests:
``ids2indices``, ``roast``/``mroast`` (rotation test), ``fry``
(closed-form roast limit), ``camera`` (correlation-aware competitive
test), ``camera_pr`` (preranked variant), ``romer`` (rotation mean-rank),
and the Wilcoxon-style ``gene_set_test`` / ``rank_sum_test_with_correlation``.
RNG note
--------
``roast``, ``mroast``, ``romer`` and the simulation branch of
``gene_set_test`` use rotations drawn from a random normal distribution.
R's Mersenne-Twister stream is not reproduced byte-for-byte on the Python
side; these functions take an ``rng`` argument that is forwarded to
``numpy.random.default_rng``. Deterministic summaries (``ngenes``,
observed set statistics, active proportions) match R to machine precision;
Monte-Carlo p-values from independently-seeded streams agree to within
the Monte-Carlo sampling error (see ``known_diff_roast_rng.md``).
"""
from __future__ import annotations
import warnings
from typing import Any
import numpy as np
import pandas as pd
from scipy import linalg, stats
from .classes import get_eawp
from .lmfit import non_estimable
from .squeeze_var import fit_f_dist, squeeze_var
from .utils import (
_zscore_t_bailey,
p_adjust,
zscore_t,
)
# ---------------------------------------------------------------------------
# ids2indices
# ---------------------------------------------------------------------------
[docs]
def ids2indices(
gene_sets,
identifiers,
remove_empty: bool = True,
) -> dict:
"""
Map named gene sets of identifier strings to zero-based integer indices.
Port of R limma's ``ids2indices``. Indices are returned in Python's
zero-based convention (the R version returns 1-based indices); this
is what the downstream Python gene-set functions expect.
Parameters
----------
gene_sets : dict or list
Either a dict mapping set names to iterables of identifiers, or a
single iterable (wrapped as ``{"Set1": gene_sets}``, matching R's
``if(!is.list(gene.sets))`` branch).
identifiers : array_like of str
Identifier vector; the returned indices are positions in this
vector.
remove_empty : bool, default True
Drop sets that contain no matches.
Returns
-------
dict[str, np.ndarray]
Dict mapping each set name to an ``int64`` array of zero-based
indices into ``identifiers``.
"""
if not isinstance(gene_sets, dict):
gene_sets = {"Set1": gene_sets}
ids = np.asarray(identifiers).astype(str)
# Build position lookup in insertion order, keeping only the first
# occurrence of each identifier (matches R's which(identifiers %in% x)
# semantics when identifiers contain duplicates).
lookup: dict[str, int] = {}
for pos, name in enumerate(ids):
if name not in lookup:
lookup[name] = pos
out: dict[str, np.ndarray] = {}
for name, members in gene_sets.items():
members_arr = np.asarray(members).astype(str)
hits = np.fromiter(
(lookup[m] for m in members_arr if m in lookup),
dtype=np.int64,
)
hits.sort()
out[name] = hits
if remove_empty:
out = {k: v for k, v in out.items() if v.size > 0}
return out
# ---------------------------------------------------------------------------
# _lm_effects (port of R limma's .lmEffects)
# ---------------------------------------------------------------------------
def _lm_effects(
y,
design: np.ndarray | None = None,
contrast: Any = None,
array_weights: np.ndarray | None = None,
gene_weights: np.ndarray | None = None,
weights: np.ndarray | None = None,
block=None,
correlation: float | None = None,
) -> np.ndarray:
"""
Compute matrix of effects from gene-wise linear models.
Port of R limma's ``.lmEffects``. Returns an (ngenes, df_residual+1)
matrix whose first column is the primary effect (for the chosen
contrast) and whose remaining columns are residual effects (in
arbitrary but fixed order).
"""
ea = get_eawp(y)
expr = ea["exprs"]
ngenes, n = expr.shape
if np.any(~np.isfinite(expr)):
raise ValueError("All y values must be finite and non-NA")
if design is None:
design = ea.get("design")
if design is None:
raise ValueError("design matrix not specified")
design = np.asarray(design, dtype=np.float64)
if design.ndim == 1:
design = design.reshape(-1, 1)
if design.shape[0] != n:
raise ValueError("row dimension of design matrix must match column dimension of data")
p = design.shape[1]
if n <= p:
raise ValueError("No residual degrees of freedom")
# Default contrast: last column of design.
if contrast is None:
contrast = p - 1
# Reform design so that the contrast is the last coefficient.
contrast_arr = np.atleast_1d(np.asarray(contrast))
if (
contrast_arr.ndim == 1
and contrast_arr.size == 1
and np.issubdtype(contrast_arr.dtype, np.integer)
):
k = int(contrast_arr.item())
if k < 0 or k >= p:
raise ValueError(f"contrast index {k} out of range for design with {p} columns")
if np.all(design[:, k] == 0):
raise ValueError("contrast all zero")
if k < p - 1:
keep = [j for j in range(p) if j != k]
X = np.column_stack([design[:, keep], design[:, k : k + 1]])
else:
X = design.copy()
else:
# contrast is a vector of length p: rotate design so the contrast
# direction is the last column. Implements contrastAsCoef(first=FALSE).
contrast_vec = np.asarray(contrast, dtype=np.float64)
if contrast_vec.size != p:
raise ValueError("length of contrast must match column dimension of design")
if np.all(contrast_vec == 0):
raise ValueError("contrast all zero")
Q_c, R_c = linalg.qr(contrast_vec.reshape(-1, 1), mode="full")
# R: design <- t(qr.qty(QR, t(design))) = design %*% Q.
rotated = design @ Q_c
if np.sign(R_c[0, 0]) < 0:
rotated[:, 0] = -rotated[:, 0]
# Move the (now-first) contrast coefficient to the last column.
X = np.column_stack([rotated[:, 1:], rotated[:, 0:1]])
# Allow array.weights to be alternatively passed via 'weights'.
if array_weights is None and weights is not None and np.asarray(weights).size == n:
array_weights = weights
weights = None
if array_weights is not None:
aw = np.asarray(array_weights, dtype=np.float64)
if aw.size != n:
raise ValueError("Length of array.weights doesn't match number of arrays")
if np.any(aw <= 0) or np.any(np.isnan(aw)):
raise ValueError("array.weights must be positive")
# Allow gene.weights to be alternatively passed via 'weights'.
if gene_weights is None and weights is not None and np.asarray(weights).size == ngenes:
gene_weights = weights
weights = None
if gene_weights is not None:
gw = np.asarray(gene_weights, dtype=np.float64)
if gw.size != ngenes:
raise ValueError("Length of gene.weights doesn't match number of genes")
if np.any(gw <= 0) or np.any(np.isnan(gw)):
raise ValueError("gene.weights must be positive")
if weights is None:
weights = ea.get("weights")
if weights is not None:
w_mat = np.asarray(weights, dtype=np.float64)
if w_mat.shape != (ngenes, n):
raise ValueError("weights must have same dimensions as y")
if np.any(w_mat <= 0) or np.any(np.isnan(w_mat)):
raise ValueError("weights must be positive")
else:
w_mat = None
y_mat = expr.copy()
# Divide out array weights.
if array_weights is not None:
ws = np.sqrt(np.asarray(array_weights, dtype=np.float64))
X = X * ws[:, np.newaxis]
y_mat = y_mat * ws # column-wise scale
array_weights = None
# Correlation matrix for block design.
R_chol = None
if block is not None:
if correlation is None:
raise ValueError("correlation must be set")
block_vec = np.asarray(block)
if block_vec.size != n:
raise ValueError("Length of block does not match number of arrays")
ub = np.unique(block_vec)
Z = (block_vec[:, None] == ub[None, :]).astype(np.float64)
cormatrix = Z @ (correlation * Z.T)
np.fill_diagonal(cormatrix, 1.0)
R_chol = linalg.cholesky(cormatrix, lower=False)
if w_mat is None:
# R: y <- t(backsolve(R, t(y), transpose=TRUE))
# X <- backsolve(R, X, transpose=TRUE)
y_mat = linalg.solve_triangular(R_chol, y_mat.T, trans="T").T
X = linalg.solve_triangular(R_chol, X, trans="T")
# QR decomposition (fast path).
Q, R_qr = linalg.qr(X, mode="full")
rank = int(np.sum(np.abs(np.diag(R_qr)) > 1e-10))
if rank < p:
raise ValueError("design must be full column rank")
if w_mat is None:
# R: Effects <- t(qr.qty(qrX, t(y)))
# qr.qty applies Q^T. Effects has shape (ngenes, n).
Effects = (Q.T @ y_mat.T).T
signc = np.sign(R_qr[p - 1, p - 1])
if p > 1:
# Keep columns p..n (1-indexed) -> p-1..n-1 (0-indexed)
Effects = Effects[:, p - 1 :]
if signc < 0:
Effects[:, 0] = signc * Effects[:, 0]
else:
Effects = np.zeros((ngenes, n), dtype=np.float64)
signc = np.zeros(ngenes)
ws_mat = np.sqrt(w_mat)
for g in range(ngenes):
wX = X * ws_mat[g, :, np.newaxis]
wy = y_mat[g, :] * ws_mat[g, :]
if R_chol is not None:
wy = linalg.solve_triangular(R_chol, wy, trans="T")
wX = linalg.solve_triangular(R_chol, wX, trans="T")
Qg, Rg = linalg.qr(wX, mode="full")
signc[g] = np.sign(Rg[p - 1, p - 1])
Effects[g, :] = Qg.T @ wy
if p > 1:
Effects = Effects[:, p - 1 :]
Effects[:, 0] = signc * Effects[:, 0]
if gene_weights is not None:
gw = np.asarray(gene_weights, dtype=np.float64)
Effects = np.sqrt(gw)[:, np.newaxis] * Effects
return Effects
# ---------------------------------------------------------------------------
# roast / mroast
# ---------------------------------------------------------------------------
_SQRT2 = np.sqrt(2.0)
def _squeeze_var_with_prior(
var: np.ndarray,
df: float,
var_prior,
df_prior,
) -> np.ndarray:
"""Port of R limma's ``.squeezeVar``: posterior variance given prior."""
var = np.asarray(var, dtype=np.float64)
df_prior_arr = np.atleast_1d(df_prior).astype(np.float64)
var_prior_arr = np.atleast_1d(var_prior).astype(np.float64)
# (df.prior*var.prior + df*var) / (df.prior + df)
num = df_prior_arr * var_prior_arr + df * var
denom = df_prior_arr + df
# Broadcast scalar priors if needed.
return num / denom
def _roast_effects(
effects: np.ndarray,
gene_weights: np.ndarray | None,
set_statistic: str,
var_prior,
df_prior,
var_post,
nrot: int,
approx_zscore: bool,
legacy: bool,
rng: np.random.Generator,
chunk: int = 1000,
) -> dict:
"""
Rotation gene-set test given the effects matrix for one set.
Port of R limma's ``.roastEffects``. The first column of ``effects`` is
the primary (contrast) effect; the remaining columns are residual
effects. Rows are genes already subset to the gene set of interest.
"""
if legacy:
chunk = nrot
nset = effects.shape[0]
neffects = effects.shape[1]
df_residual = neffects - 1
df_total = np.asarray(df_prior, dtype=np.float64) + df_residual
# Observed z-statistics
modt = effects[:, 0] / np.sqrt(np.asarray(var_post, dtype=np.float64))
if approx_zscore and not legacy:
df_total_winsor = np.minimum(df_total, 10000.0)
modt = _zscore_t_bailey(modt, np.broadcast_to(df_total_winsor, modt.shape))
else:
modt = zscore_t(modt, df_total, approx=approx_zscore, method="hill")
# Active proportions
if gene_weights is None:
a1 = float(np.mean(modt > _SQRT2))
a2 = float(np.mean(modt < -_SQRT2))
else:
s = np.sign(gene_weights)
ss = float(np.sum(np.abs(s)))
a1 = float(np.sum(s * modt > _SQRT2)) / ss if ss > 0 else 0.0
a2 = float(np.sum(s * modt < -_SQRT2)) / ss if ss > 0 else 0.0
# Observed set statistics
statobs = np.zeros(4)
if set_statistic not in ("mean", "floormean", "mean50", "msq"):
raise ValueError(
f"set.statistic '{set_statistic}' not recognized. "
"Must be 'mean', 'floormean', 'mean50' or 'msq'."
)
chimed = stats.norm.isf(0.25)
if set_statistic == "mean":
modt_use = gene_weights * modt if gene_weights is not None else modt
m = float(np.mean(modt_use))
statobs[0] = -m
statobs[1] = m
statobs[3] = float(np.mean(np.abs(modt_use)))
modt = modt_use # Pass to rotation loop
elif set_statistic == "floormean":
amodt = np.maximum(np.abs(modt), chimed)
if gene_weights is not None:
amodt = gene_weights * amodt
modt = gene_weights * modt
statobs[0] = float(np.mean(np.maximum(-modt, 0)))
statobs[1] = float(np.mean(np.maximum(modt, 0)))
statobs[2] = float(max(statobs[0], statobs[1]))
statobs[3] = float(np.mean(amodt))
elif set_statistic == "mean50":
if nset % 2 == 0:
half1 = nset // 2
half2 = (
half1 + 1
) # 1-based in R -> still means index half1 in 0-based for bottom half start
else:
half1 = nset // 2 + 1
half2 = half1
# R uses sort(modt, partial=half2); top half = s[1:half1], bottom = s[half2:nset]
if gene_weights is not None:
modt = gene_weights * modt
s_sorted = np.sort(modt)
# 0-based: top half = s_sorted[0:half1], bottom half = s_sorted[half2-1:nset]
statobs[0] = float(-np.mean(s_sorted[:half1]))
statobs[1] = float(np.mean(s_sorted[half2 - 1 : nset]))
statobs[2] = float(max(statobs[0], statobs[1]))
s_sorted = np.sort(np.abs(modt))
statobs[3] = float(np.mean(s_sorted[half2 - 1 : nset]))
elif set_statistic == "msq":
modt2 = modt**2
if gene_weights is not None:
modt2 = np.abs(gene_weights) * modt2
modt = gene_weights * modt
statobs[0] = float(np.sum(modt2[modt < 0]) / nset)
statobs[1] = float(np.sum(modt2[modt > 0]) / nset)
statobs[2] = float(max(statobs[0], statobs[1]))
statobs[3] = float(np.mean(modt2))
# Rotation loop
nchunk = int(np.ceil(nrot / chunk))
nroti = int(np.ceil(nrot / nchunk))
overshoot = nchunk * nroti - nrot
count = np.zeros(4, dtype=np.int64)
FinDf = np.isfinite(np.asarray(df_prior, dtype=np.float64))
FinDf = np.atleast_1d(FinDf)
df_prior_arr = np.atleast_1d(np.asarray(df_prior, dtype=np.float64))
var_prior_arr = np.atleast_1d(np.asarray(var_prior, dtype=np.float64))
half1_loc = None
half2_loc = None
if set_statistic == "mean50":
if nset % 2 == 0:
half1_loc = nset // 2
half2_loc = half1_loc + 1
else:
half1_loc = nset // 2 + 1
half2_loc = half1_loc
df_total_winsor = None
if approx_zscore and not legacy:
df_total_winsor = np.minimum(df_total, 10000.0)
for chunki in range(nchunk):
if chunki == nchunk - 1:
nroti_this = nroti - overshoot
else:
nroti_this = nroti
statrot = np.zeros((nroti_this, 4))
# Rotated primary effects: modtr has shape (nset, nroti_this)
R_mat = rng.standard_normal((nroti_this, neffects))
R_mat = R_mat / np.sqrt(np.sum(R_mat**2, axis=1, keepdims=True))
modtr = effects @ R_mat.T # (nset, nroti_this)
# Moderated rotated variances
if np.all(FinDf):
s2r = (np.sum(effects**2, axis=1, keepdims=True) - modtr**2) / df_residual
df_prior_b = df_prior_arr.reshape(-1, 1) if df_prior_arr.size > 1 else df_prior_arr[0]
var_prior_b = (
var_prior_arr.reshape(-1, 1) if var_prior_arr.size > 1 else var_prior_arr[0]
)
df_total_b = (
df_total.reshape(-1, 1) if np.ndim(df_total) and df_total.size > 1 else df_total
)
s2r = (df_prior_b * var_prior_b + df_residual * s2r) / df_total_b
elif np.any(FinDf):
s2r = (np.sum(effects**2, axis=1, keepdims=True) - modtr**2) / df_residual
if var_prior_arr.size > 1:
s20 = var_prior_arr[FinDf].reshape(-1, 1)
else:
s20 = var_prior_arr[0]
df_prior_sub = df_prior_arr[FinDf].reshape(-1, 1)
df_total_sub = df_total[FinDf].reshape(-1, 1)
s2r[FinDf, :] = (df_prior_sub * s20 + df_residual * s2r[FinDf, :]) / df_total_sub
else:
s2r = np.full_like(modtr, var_prior_arr[0] if var_prior_arr.size == 1 else 1.0)
if var_prior_arr.size > 1:
s2r = np.broadcast_to(var_prior_arr.reshape(-1, 1), modtr.shape).copy()
# Rotated z-statistics
modtr = modtr / np.sqrt(s2r)
if approx_zscore and not legacy:
dwinsor = np.broadcast_to(df_total_winsor.reshape(-1, 1), modtr.shape)
modtr = _zscore_t_bailey(modtr, dwinsor)
else:
df_total_b = (
np.broadcast_to(np.asarray(df_total).reshape(-1, 1), modtr.shape)
if np.ndim(df_total) and np.asarray(df_total).size > 1
else df_total
)
modtr = zscore_t(modtr, df_total_b, approx=approx_zscore, method="hill")
if set_statistic == "mean":
if gene_weights is not None:
modtr = gene_weights[:, np.newaxis] * modtr
m_r = np.mean(modtr, axis=0)
statrot[:, 0] = -m_r
statrot[:, 1] = m_r
statrot[:, 3] = np.mean(np.abs(modtr), axis=0)
count[0] += int(np.sum(statrot[:, 0] > statobs[0]) + np.sum(statrot[:, 1] > statobs[0]))
count[1] += int(np.sum(statrot[:, 0] > statobs[1]) + np.sum(statrot[:, 1] > statobs[1]))
count[3] += int(np.sum(statrot[:, 3] > statobs[3]))
elif set_statistic == "floormean":
amodtr = np.maximum(np.abs(modtr), chimed)
if gene_weights is not None:
amodtr = gene_weights[:, np.newaxis] * amodtr
modtr = gene_weights[:, np.newaxis] * modtr
statrot[:, 0] = np.mean(np.maximum(-modtr, 0), axis=0)
statrot[:, 1] = np.mean(np.maximum(modtr, 0), axis=0)
ub = statrot[:, 1] > statrot[:, 0]
statrot[ub, 2] = statrot[ub, 1]
statrot[~ub, 2] = statrot[~ub, 0]
statrot[:, 3] = np.mean(amodtr, axis=0)
count[0] += int(np.sum(statrot[:, 0] > statobs[0]) + np.sum(statrot[:, 1] > statobs[0]))
count[1] += int(np.sum(statrot[:, 0] > statobs[1]) + np.sum(statrot[:, 1] > statobs[1]))
count[2] += int(np.sum(statrot[:, 2] > statobs[2]))
count[3] += int(np.sum(statrot[:, 3] > statobs[3]))
elif set_statistic == "mean50":
if gene_weights is not None:
modtr = gene_weights[:, np.newaxis] * modtr
for r in range(nroti_this):
s_sorted = np.sort(modtr[:, r])
statrot[r, 0] = -np.mean(s_sorted[:half1_loc])
statrot[r, 1] = np.mean(s_sorted[half2_loc - 1 : nset])
statrot[r, 2] = max(statrot[r, 0], statrot[r, 1])
s_sorted = np.sort(np.abs(modtr[:, r]))
statrot[r, 3] = np.mean(s_sorted[half2_loc - 1 : nset])
count[0] += int(np.sum(statrot[:, 0] > statobs[0]) + np.sum(statrot[:, 1] > statobs[0]))
count[1] += int(np.sum(statrot[:, 0] > statobs[1]) + np.sum(statrot[:, 1] > statobs[1]))
count[2] += int(np.sum(statrot[:, 2] > statobs[2]))
count[3] += int(np.sum(statrot[:, 3] > statobs[3]))
elif set_statistic == "msq":
if gene_weights is not None:
gw_sqrt = np.sqrt(np.abs(gene_weights))
modtr = gw_sqrt[:, np.newaxis] * modtr
statrot[:, 0] = np.mean(np.maximum(-modtr, 0) ** 2, axis=0)
statrot[:, 1] = np.mean(np.maximum(modtr, 0) ** 2, axis=0)
ub = statrot[:, 1] > statrot[:, 0]
statrot[ub, 2] = statrot[ub, 1]
statrot[~ub, 2] = statrot[~ub, 0]
statrot[:, 3] = np.mean(modtr**2, axis=0)
count[0] += int(np.sum(statrot[:, 0] > statobs[0]) + np.sum(statrot[:, 1] > statobs[0]))
count[1] += int(np.sum(statrot[:, 0] > statobs[1]) + np.sum(statrot[:, 1] > statobs[1]))
count[2] += int(np.sum(statrot[:, 2] > statobs[2]))
count[3] += int(np.sum(statrot[:, 3] > statobs[3]))
if set_statistic == "mean":
count[2] = int(min(count[0], count[1]))
# Denominators: (2,2,1,1)*nrot + 1
denom = np.array([2 * nrot + 1, 2 * nrot + 1, nrot + 1, nrot + 1], dtype=np.float64)
p = (count + 1) / denom
# Assemble data.frame-like output matching R:
# rows: Down, Up, UpOrDown, Mixed; cols: Active.Prop, P.Value
active = np.array([a2, a1, max(a1, a2), a1 + a2])
pvals = p
out_df = pd.DataFrame(
{"active_prop": active, "p_value": pvals},
index=["Down", "Up", "UpOrDown", "Mixed"],
)
return {"p_value": out_df, "ngenes_in_set": nset}
def _resolve_rng(rng):
if rng is None:
return np.random.default_rng()
if isinstance(rng, np.random.Generator):
return rng
return np.random.default_rng(rng)
def _prep_roast_inputs(
y,
design,
contrast,
gene_weights,
var_prior,
df_prior,
covariate_trend,
lm_kwargs: dict,
):
Effects = _lm_effects(
y,
design=design,
contrast=contrast,
array_weights=lm_kwargs.get("array_weights"),
weights=lm_kwargs.get("weights"),
block=lm_kwargs.get("block"),
correlation=lm_kwargs.get("correlation"),
)
ngenes = Effects.shape[0]
df_residual = Effects.shape[1] - 1
s2 = np.mean(Effects[:, 1:] ** 2, axis=1)
if var_prior is None or df_prior is None:
sv = squeeze_var(
s2,
df_residual,
covariate=covariate_trend,
robust=lm_kwargs.get("robust", False),
winsor_tail_p=lm_kwargs.get("winsor_tail_p", (0.05, 0.1)),
)
var_prior_out = sv["var_prior"]
df_prior_out = sv["df_prior"]
var_post = sv["var_post"]
else:
var_prior_out = var_prior
df_prior_out = df_prior
var_post = _squeeze_var_with_prior(s2, df_residual, var_prior, df_prior)
return Effects, ngenes, df_residual, var_prior_out, df_prior_out, var_post
def _resolve_set_index(idx, ngenes):
"""Convert a gene-set selector to a NumPy int array of 0-based positions."""
if idx is None:
return np.arange(ngenes, dtype=np.int64)
arr = np.asarray(idx)
if arr.ndim == 0:
arr = arr.reshape(1)
if arr.dtype.kind == "b":
return np.where(arr)[0].astype(np.int64)
return arr.astype(np.int64, copy=True)
[docs]
def roast(
y,
index=None,
design=None,
contrast=None,
geneid=None,
set_statistic: str = "mean",
gene_weights: np.ndarray | None = None,
var_prior: float | None = None,
df_prior: float | None = None,
nrot: int = 1999,
approx_zscore: bool = True,
legacy: bool = False,
rng=None,
**lmfit_kwargs,
):
"""
Rotation gene-set test (single set).
Port of R limma's ``roast.default``. When ``index`` is a dict/list of
sets, dispatches to :func:`mroast`.
Parameters
----------
y : ndarray, EList, AnnData, or DataFrame
Expression data.
index : array_like of int, dict, or list of array_like, optional
Set members (0-based indices). A dict or list of arrays is treated
as multiple sets and routed to :func:`mroast`.
design : array_like, optional
Design matrix. Defaults to ``y.design`` if available.
contrast : int or array_like, optional
Column index (0-based) or contrast vector. Defaults to the last
column of ``design``.
geneid : str or array_like, optional
Optional gene identifier vector (or column name in ``y.genes``).
set_statistic : {"mean", "floormean", "mean50", "msq"}, default "mean"
Summary statistic of the moderated z-scores.
gene_weights : array_like, optional
Per-gene weights.
var_prior, df_prior : float, optional
Hyperparameters. If either is ``None``, both are estimated by
:func:`squeeze_var`.
nrot : int, default 1999
Number of rotations.
approx_zscore : bool, default True
Use an approximation for the z-score transform.
legacy : bool, default False
If True, use ``zscore_t(..., method="hill")`` for all z-score
computations (matches R's pre-2019 behaviour).
rng : int, numpy.random.Generator, or None
Random-number stream. Deterministic outputs match R; Monte-Carlo
p-values use this stream.
Returns
-------
dict
``{"p_value": DataFrame(Active.Prop, P.Value), "ngenes_in_set": int}``.
"""
# If index is a list of sets, dispatch to mroast
if isinstance(index, dict) or (
isinstance(index, list)
and len(index)
and all(hasattr(x, "__len__") and not isinstance(x, str) for x in index)
):
return mroast(
y,
index=index,
design=design,
contrast=contrast,
geneid=geneid,
set_statistic=set_statistic,
gene_weights=gene_weights,
var_prior=var_prior,
df_prior=df_prior,
nrot=nrot,
approx_zscore=approx_zscore,
legacy=legacy,
rng=rng,
**lmfit_kwargs,
)
rng_state = _resolve_rng(rng)
# Trend covariate
covariate = None
if lmfit_kwargs.get("trend", False) or lmfit_kwargs.get("trend_var", False):
covariate = np.mean(np.asarray(get_eawp(y)["exprs"]), axis=1)
Effects, ngenes, df_residual, vp, dp, var_post = _prep_roast_inputs(
y, design, contrast, gene_weights, var_prior, df_prior, covariate, lmfit_kwargs
)
# Subset to gene set
idx = _resolve_set_index(index, ngenes)
Effects_set = Effects[idx, :]
vp_set = vp[idx] if np.ndim(vp) and np.size(vp) > 1 else vp
dp_set = dp[idx] if np.ndim(dp) and np.size(dp) > 1 else dp
var_post_set = var_post[idx] if np.ndim(var_post) and np.size(var_post) > 1 else var_post
NGenesInSet = Effects_set.shape[0]
# gene_weights alignment
if gene_weights is not None:
gw = np.asarray(gene_weights, dtype=np.float64).copy()
if gw.size not in (NGenesInSet, ngenes):
raise ValueError(
"length of gene.weights doesn't agree with number of genes or size of set"
)
if gw.size == ngenes:
gw = gw[idx]
else:
gw = None
return _roast_effects(
Effects_set,
gene_weights=gw,
set_statistic=set_statistic,
var_prior=vp_set,
df_prior=dp_set,
var_post=var_post_set,
nrot=nrot,
approx_zscore=approx_zscore,
legacy=legacy,
rng=rng_state,
)
[docs]
def mroast(
y,
index,
design=None,
contrast=None,
geneid=None,
set_statistic: str = "mean",
gene_weights: np.ndarray | None = None,
var_prior: float | None = None,
df_prior: float | None = None,
nrot: int = 1999,
approx_zscore: bool = True,
legacy: bool = False,
adjust_method: str = "BH",
midp: bool = True,
sort: str = "directional",
rng=None,
**lmfit_kwargs,
) -> pd.DataFrame:
"""
Rotation gene-set test over many sets.
Port of R limma's ``mroast.default``. See :func:`roast` for the
single-set API; this version returns a :class:`pandas.DataFrame`.
The ``sort`` argument accepts ``True``/``False`` (aliased to
``"directional"`` / ``"none"``) as well as the R strings.
"""
rng_state = _resolve_rng(rng)
covariate = None
if lmfit_kwargs.get("trend", False):
covariate = np.mean(np.asarray(get_eawp(y)["exprs"]), axis=1)
Effects, ngenes, df_residual, vp, dp, var_post = _prep_roast_inputs(
y, design, contrast, gene_weights, var_prior, df_prior, covariate, lmfit_kwargs
)
# Normalise index to a dict of label -> 0-based int index array.
if index is None:
index = {"set1": np.arange(ngenes, dtype=np.int64)}
elif isinstance(index, dict):
pass
elif (
isinstance(index, list)
and len(index)
and all(hasattr(x, "__len__") and not isinstance(x, str) for x in index)
):
index = {f"set{i + 1}": v for i, v in enumerate(index)}
else:
index = {"set1": index}
names = list(index.keys())
if len(set(names)) != len(names):
raise ValueError("Gene sets don't have unique names")
nsets = len(index)
if nsets == 0:
raise ValueError("index is empty")
if gene_weights is not None:
gw_full = np.asarray(gene_weights, dtype=np.float64).copy()
if gw_full.size != ngenes:
raise ValueError("gene.weights vector should be of length nrow(y)")
else:
gw_full = None
pv = np.zeros((nsets, 4))
active = np.zeros((nsets, 4))
NGenes = np.zeros(nsets, dtype=np.int64)
for i, name in enumerate(names):
g = _resolve_set_index(index[name], ngenes)
E = Effects[g, :]
vp_i = vp[g] if np.ndim(vp) and np.size(vp) > 1 else vp
dp_i = dp[g] if np.ndim(dp) and np.size(dp) > 1 else dp
vpost_i = var_post[g] if np.ndim(var_post) and np.size(var_post) > 1 else var_post
gw_i = gw_full[g] if gw_full is not None else None
out = _roast_effects(
E,
gene_weights=gw_i,
set_statistic=set_statistic,
var_prior=vp_i,
df_prior=dp_i,
var_post=vpost_i,
nrot=nrot,
approx_zscore=approx_zscore,
legacy=legacy,
rng=rng_state,
)
# R order: Down, Up, UpOrDown, Mixed
pv[i, :] = out["p_value"]["p_value"].values
active[i, :] = out["p_value"]["active_prop"].values
NGenes[i] = out["ngenes_in_set"]
Up_wins = pv[:, 1] < pv[:, 0]
Direction = np.where(Up_wins, "Up", "Down")
TwoSidedP2 = pv[:, 2].copy()
MixedP2 = pv[:, 3].copy()
if midp:
TwoSidedP2 = TwoSidedP2 - 1.0 / (2.0 * (nrot + 1))
MixedP2 = MixedP2 - 1.0 / (2.0 * (nrot + 1))
tab = pd.DataFrame(
{
"n_genes": NGenes,
"prop_down": active[:, 0],
"prop_up": active[:, 1],
"direction": Direction,
"p_value": pv[:, 2],
"fdr": p_adjust(TwoSidedP2, method=adjust_method),
"p_value_mixed": pv[:, 3],
"fdr_mixed": p_adjust(MixedP2, method=adjust_method),
},
index=names,
)
if midp:
tab["fdr"] = np.maximum(tab["fdr"].values, pv[:, 2])
tab["fdr_mixed"] = np.maximum(tab["fdr_mixed"].values, pv[:, 3])
# Sort
if isinstance(sort, bool):
sort = "directional" if sort else "none"
if sort not in ("directional", "mixed", "none"):
raise ValueError(f"sort '{sort}' not recognized. Must be 'directional', 'mixed' or 'none'.")
if sort == "none":
return tab
if sort == "directional":
prop = np.maximum(tab["prop_up"].values, tab["prop_down"].values)
o = np.lexsort(
(tab["p_value_mixed"].values, -NGenes.astype(float), -prop, tab["p_value"].values)
)
else:
prop = tab["prop_up"].values + tab["prop_down"].values
o = np.lexsort(
(tab["p_value"].values, -NGenes.astype(float), -prop, tab["p_value_mixed"].values)
)
return tab.iloc[o]
# ---------------------------------------------------------------------------
# fry
# ---------------------------------------------------------------------------
def _fry_effects(
effects: np.ndarray,
index: dict,
geneid,
gene_weights,
sort,
) -> pd.DataFrame:
G = effects.shape[0]
neffects = effects.shape[1]
df_residual = neffects - 1
names = list(index.keys())
nsets = len(names)
if len(set(names)) != len(names):
raise ValueError("Gene sets don't have unique names")
NGenes = np.zeros(nsets, dtype=np.int64)
PValue_Mixed = np.zeros(nsets)
t_stat = np.zeros(nsets)
for i, nm in enumerate(names):
iset = _resolve_set_index(index[nm], G)
EffectsSet = effects[iset, :].copy()
if gene_weights is not None:
iw = gene_weights[iset]
EffectsSet = iw[:, np.newaxis] * EffectsSet
MeanEffectsSet = np.mean(EffectsSet, axis=0)
denom = np.sqrt(np.mean(MeanEffectsSet[1:] ** 2))
t_stat[i] = MeanEffectsSet[0] / denom if denom > 0 else np.nan
NGenes[i] = EffectsSet.shape[0]
if NGenes[i] > 1:
# SVD of EffectsSet; we need the singular values only.
# R: SVD <- svd(EffectsSet, nu=0); A <- SVD$d^2
sv = np.linalg.svd(EffectsSet, compute_uv=False)
A = sv**2
d1 = A.size
d = d1 - 1
beta_mean = 1.0 / d1
beta_var = d / d1 / d1 / (d1 / 2.0 + 1.0)
Fobs = (np.sum(EffectsSet[:, 0] ** 2) - A[-1]) / (A[0] - A[-1])
Frb_mean = (np.sum(A) * beta_mean - A[-1]) / (A[0] - A[-1])
COV = np.full((d1, d1), -beta_var / d)
np.fill_diagonal(COV, beta_var)
Frb_var = float(A @ COV @ A) / (A[0] - A[-1]) ** 2
alphaplusbeta = Frb_mean * (1.0 - Frb_mean) / Frb_var - 1.0
alpha = alphaplusbeta * Frb_mean
beta_param = alphaplusbeta - alpha
PValue_Mixed[i] = stats.beta.sf(Fobs, alpha, beta_param)
Direction = np.where(t_stat < 0, "Down", "Up")
PValue = 2.0 * stats.t.cdf(-np.abs(t_stat), df=df_residual)
singleton = NGenes == 1
PValue_Mixed[singleton] = PValue[singleton]
if nsets > 1:
tab = pd.DataFrame(
{
"n_genes": NGenes,
"direction": Direction,
"p_value": PValue,
"fdr": p_adjust(PValue, method="BH"),
"p_value_mixed": PValue_Mixed,
"fdr_mixed": p_adjust(PValue_Mixed, method="BH"),
},
index=names,
)
else:
tab = pd.DataFrame(
{
"n_genes": NGenes,
"direction": Direction,
"p_value": PValue,
"p_value_mixed": PValue_Mixed,
},
index=names,
)
if isinstance(sort, bool):
sort = "directional" if sort else "none"
if sort not in ("directional", "mixed", "none"):
raise ValueError(f"sort '{sort}' not recognized. Must be 'directional', 'mixed' or 'none'.")
if sort == "none":
return tab
if sort == "directional":
o = np.lexsort((tab["p_value_mixed"].values, -NGenes.astype(float), tab["p_value"].values))
else:
o = np.lexsort((tab["p_value"].values, -NGenes.astype(float), tab["p_value_mixed"].values))
return tab.iloc[o]
[docs]
def fry(
y,
index=None,
design=None,
contrast=None,
geneid=None,
gene_weights: np.ndarray | None = None,
standardize: str = "posterior.sd",
sort="directional",
**lmfit_kwargs,
) -> pd.DataFrame:
"""
Fast closed-form limit of ``roast`` (``nrot -> Inf`` with ``df.prior=Inf``).
Port of R limma's ``fry.default``.
"""
if gene_weights is not None:
gw = np.asarray(gene_weights, dtype=np.float64)
ea = get_eawp(y)
if gw.size != ea["exprs"].shape[0]:
raise ValueError("length of gene.weights should equal nrow(y)")
else:
gw = None
if standardize not in ("none", "residual.sd", "posterior.sd", "p2"):
raise ValueError(
f"standardize '{standardize}' not recognized. "
"Must be 'none', 'residual.sd', 'posterior.sd' or 'p2'."
)
covariate = None
if lmfit_kwargs.get("trend", False):
covariate = np.mean(np.asarray(get_eawp(y)["exprs"]), axis=1)
Effects = _lm_effects(
y,
design=design,
contrast=contrast,
array_weights=lmfit_kwargs.get("array_weights"),
weights=lmfit_kwargs.get("weights"),
block=lmfit_kwargs.get("block"),
correlation=lmfit_kwargs.get("correlation"),
)
G = Effects.shape[0]
df_residual = Effects.shape[1] - 1
if standardize != "none":
# Gauss-Legendre 128 nodes on [0,1]; Eu2max in R uses "uniform" dist.
gq_nodes, gq_wts = np.polynomial.legendre.leggauss(128)
gq_nodes = (gq_nodes + 1) / 2.0
gq_wts = gq_wts / 2.0
Eu2max = float(
np.sum(
(df_residual + 1) * gq_nodes**df_residual * stats.chi2.ppf(gq_nodes, df=1) * gq_wts
)
)
u2max = np.max(Effects**2, axis=1)
s2_robust = (np.sum(Effects**2, axis=1) - u2max) / (df_residual + 1 - Eu2max)
if standardize == "p2":
sv = squeeze_var(
s2_robust,
df=0.92 * df_residual,
covariate=covariate,
robust=lmfit_kwargs.get("robust", False),
winsor_tail_p=lmfit_kwargs.get("winsor_tail_p", (0.05, 0.1)),
)
s2_robust = sv["var_post"]
elif standardize == "posterior.sd":
s2 = np.mean(Effects[:, 1:] ** 2, axis=1)
if lmfit_kwargs.get("robust", False):
# fitFDistRobustly path - use squeeze_var with robust=True and
# lift out the prior var / df it estimates.
sv = squeeze_var(
s2,
df=df_residual,
covariate=covariate,
robust=True,
winsor_tail_p=lmfit_kwargs.get("winsor_tail_p", (0.05, 0.1)),
)
df_prior = sv["df_prior"]
var_prior = sv["var_prior"]
else:
fit = fit_f_dist(s2, df1=df_residual, covariate=covariate)
df_prior = fit["df2"]
var_prior = fit["scale"]
s2_robust = _squeeze_var_with_prior(
s2_robust, df=0.92 * df_residual, var_prior=var_prior, df_prior=df_prior
)
# "residual.sd": use s2_robust as-is
Effects = Effects / np.sqrt(s2_robust)[:, np.newaxis]
# Normalise index
if index is None:
index = {"set1": np.arange(G, dtype=np.int64)}
elif isinstance(index, dict):
pass
elif (
isinstance(index, list)
and len(index)
and all(hasattr(x, "__len__") and not isinstance(x, str) for x in index)
):
index = {f"set{i + 1}": v for i, v in enumerate(index)}
else:
index = {"set1": index}
return _fry_effects(Effects, index, geneid=geneid, gene_weights=gw, sort=sort)
# ---------------------------------------------------------------------------
# camera / inter_gene_correlation / camera_pr
# ---------------------------------------------------------------------------
[docs]
def inter_gene_correlation(y: np.ndarray, design: np.ndarray) -> dict:
"""
Variance-inflation factor and inter-gene correlation.
Port of R limma's ``interGeneCorrelation``. Returns
``{"vif": ..., "correlation": ...}``.
"""
y = np.asarray(y, dtype=np.float64)
design = np.asarray(design, dtype=np.float64)
m = y.shape[0]
Q, R = linalg.qr(design, mode="full")
rank = int(np.sum(np.abs(np.diag(R)) > 1e-10))
# R: y <- qr.qty(qrdesign, t(y))[-(1:rank),] -> residual rows of Q^T y^T
resid = (Q.T @ y.T)[rank:, :]
# resid has shape (n - rank, m). R: y <- t(y) / sqrt(colMeans(y^2))
# colMeans along dim=1 of resid gives per-gene mean of residuals^2.
col_means = np.mean(resid**2, axis=0)
col_means = np.where(col_means > 0, col_means, 1e-300)
ny = resid / np.sqrt(col_means) # shape (n-rank, m)
# vif = m * mean(colMeans(y)^2); here colMeans on ny is along axis=0
# but after t() -> t() in R, it's the mean across residual rows per gene.
vif = m * float(np.mean(np.mean(ny, axis=0) ** 2))
correlation = (vif - 1) / (m - 1)
return {"vif": vif, "correlation": correlation}
[docs]
def camera(
y,
index,
design=None,
contrast=None,
weights: np.ndarray | None = None,
use_ranks: bool = False,
allow_neg_cor: bool = False,
inter_gene_cor: float | None = 0.01,
trend_var: bool = False,
sort: bool = True,
directional: bool = True,
**kwargs,
) -> pd.DataFrame:
"""
Competitive gene-set test with inter-gene correlation.
Port of R limma's ``camera.default``.
"""
if kwargs:
warnings.warn(f"Extra arguments disregarded: {list(kwargs.keys())}")
ea = get_eawp(y)
y_mat = ea["exprs"]
G, n = y_mat.shape
ID = None
if ea.get("probes") is not None and hasattr(ea["probes"], "index"):
ID = list(ea["probes"].index)
if G < 3:
raise ValueError("Too few genes in dataset: need at least 3")
if not isinstance(index, dict):
if (
isinstance(index, list)
and len(index)
and all(hasattr(x, "__len__") and not isinstance(x, str) for x in index)
):
index = {f"set{i + 1}": v for i, v in enumerate(index)}
else:
index = {"set1": index}
nsets = len(index)
if nsets == 0:
raise ValueError("index is empty")
if design is None:
design = ea.get("design")
if design is None:
raise ValueError("design matrix not specified")
design = np.asarray(design, dtype=np.float64)
if design.shape[0] != n:
raise ValueError("row dimension of design matrix must match column dimension of data")
p = design.shape[1]
df_residual = n - p
if df_residual < 1:
raise ValueError("No residual df: cannot compute t-tests")
if weights is None:
weights = ea.get("weights")
fixed_cor = inter_gene_cor is not None and not (
np.ndim(inter_gene_cor) == 0 and np.isnan(inter_gene_cor)
)
if not directional:
if not use_ranks:
use_ranks = True
warnings.warn("Setting `use.ranks=TRUE` for non-directional tests")
if fixed_cor:
df_camera = np.inf if use_ranks else G - 2
else:
df_camera = min(df_residual, G - 2)
# Weight handling (array weights first)
y_use = y_mat.copy()
design_use = design.copy()
if weights is not None:
w_arr = np.asarray(weights, dtype=np.float64)
if np.any(w_arr <= 0):
raise ValueError("weights must be positive")
if w_arr.size == n:
sw = np.sqrt(w_arr)
y_use = y_use * sw
design_use = design_use * sw[:, np.newaxis]
weights = None
else:
if w_arr.size == G:
weights = np.broadcast_to(w_arr[:, np.newaxis], (G, n)).copy()
else:
weights = w_arr
if weights.shape != y_use.shape:
raise ValueError("weights not conformal with y")
# Reform design so contrast is the last column.
if contrast is None:
contrast = p - 1
contrast_arr = np.atleast_1d(np.asarray(contrast))
if contrast_arr.size == 1 and np.issubdtype(contrast_arr.dtype, np.integer):
k = int(contrast_arr.item())
if k < p - 1:
keep = [j for j in range(p) if j != k]
design_use = np.column_stack([design_use[:, keep], design_use[:, k : k + 1]])
else:
contrast_vec = np.asarray(contrast, dtype=np.float64)
Q_c, R_c = linalg.qr(contrast_vec.reshape(-1, 1), mode="full")
rotated = design_use @ Q_c
if np.sign(R_c[0, 0]) < 0:
rotated[:, 0] = -rotated[:, 0]
design_use = np.column_stack([rotated[:, 1:], rotated[:, 0:1]])
if weights is None:
Q, R_qr = linalg.qr(design_use, mode="full")
rank = int(np.sum(np.abs(np.diag(R_qr)) > 1e-10))
if rank < p:
raise ValueError("design matrix is not of full rank")
effects = Q.T @ y_use.T # (n, G)
unscaledt = effects[p - 1, :].copy()
if R_qr[p - 1, p - 1] < 0:
unscaledt = -unscaledt
else:
effects = np.zeros((n, G))
unscaledt = np.zeros(G)
sw_mat = np.sqrt(weights)
y_w = y_use * sw_mat
for g in range(G):
xw = design_use * sw_mat[g, :, np.newaxis]
Qg, Rg = linalg.qr(xw, mode="full")
rankg = int(np.sum(np.abs(np.diag(Rg)) > 1e-10))
if rankg < p:
raise ValueError(f"weighted design matrix not of full rank for gene {g}")
effects[:, g] = Qg.T @ y_w[g, :]
unscaledt[g] = effects[p - 1, g]
if Rg[p - 1, p - 1] < 0:
unscaledt[g] = -unscaledt[g]
# Standardised residuals
U = effects[p:, :]
sigma2 = np.mean(U**2, axis=0)
U_t = U.T / np.sqrt(np.maximum(sigma2, 1e-8))[:, None] # (G, n-p)
A = np.mean(y_mat, axis=1) if trend_var else None
sv = squeeze_var(sigma2, df=df_residual, covariate=A)
var_post = sv["var_post"]
df_prior = sv["df_prior"]
modt = unscaledt / np.sqrt(var_post)
if use_ranks:
Stat = modt.copy()
if not directional:
Stat = np.abs(Stat)
else:
df_total = min(df_residual + df_prior, G * df_residual)
Stat = zscore_t(modt, df=df_total, approx=True, method="hill")
if not use_ranks:
meanStat = float(np.mean(Stat))
varStat = float(np.var(Stat, ddof=1))
NGenes = np.zeros(nsets, dtype=np.int64)
Correlation = np.zeros(nsets)
Down = np.zeros(nsets)
Up = np.zeros(nsets)
names = list(index.keys())
for i, nm in enumerate(names):
iset = index[nm]
iset = np.asarray(iset)
if iset.dtype.kind in ("U", "S", "O") and ID is not None:
iset = np.array([ID.index(s) for s in iset if s in ID], dtype=np.int64)
else:
iset = iset.astype(np.int64, copy=False)
StatInSet = Stat[iset]
m = iset.size
m2 = G - m
if fixed_cor:
corr = inter_gene_cor if np.ndim(inter_gene_cor) == 0 else inter_gene_cor[i]
vif = 1 + (m - 1) * corr
else:
if m > 1:
Uset = U_t[iset, :]
vif = m * float(np.mean(np.mean(Uset, axis=0) ** 2))
corr = (vif - 1) / (m - 1)
else:
vif = 1.0
corr = np.nan
NGenes[i] = m
Correlation[i] = corr
if use_ranks:
corr_use = max(0, corr) if not allow_neg_cor else corr
pv = rank_sum_test_with_correlation(
iset, statistics=Stat, correlation=corr_use, df=df_camera
)
Down[i] = pv["less"]
Up[i] = pv["greater"]
else:
if not allow_neg_cor:
vif = max(1, vif)
meanStatInSet = float(np.mean(StatInSet))
delta = G / m2 * (meanStatInSet - meanStat)
varStatPooled = ((G - 1) * varStat - delta**2 * m * m2 / G) / (G - 2)
two_sample_t = delta / np.sqrt(varStatPooled * (vif / m + 1.0 / m2))
if np.isinf(df_camera):
Down[i] = stats.norm.cdf(two_sample_t)
Up[i] = stats.norm.sf(two_sample_t)
else:
Down[i] = stats.t.cdf(two_sample_t, df=df_camera)
Up[i] = stats.t.sf(two_sample_t, df=df_camera)
TwoSided = 2 * np.minimum(Down, Up)
if directional:
Direction = np.where(Down < Up, "Down", "Up")
if fixed_cor:
tab = pd.DataFrame(
{
"n_genes": NGenes,
"direction": Direction,
"p_value": TwoSided,
},
index=names,
)
else:
tab = pd.DataFrame(
{
"n_genes": NGenes,
"correlation": Correlation,
"direction": Direction,
"p_value": TwoSided,
},
index=names,
)
else:
if fixed_cor:
tab = pd.DataFrame(
{"n_genes": NGenes, "p_value": Up},
index=names,
)
else:
tab = pd.DataFrame(
{
"n_genes": NGenes,
"correlation": Correlation,
"p_value": Up,
},
index=names,
)
if nsets > 1:
tab["fdr"] = p_adjust(tab["p_value"].values, method="BH")
if sort and nsets > 1:
o = np.argsort(tab["p_value"].values, kind="stable")
tab = tab.iloc[o]
return tab
[docs]
def camera_pr(
statistic,
index,
use_ranks: bool = False,
inter_gene_cor=0.01,
sort: bool = True,
directional: bool = True,
**kwargs,
) -> pd.DataFrame:
"""
Pre-ranked competitive gene-set test.
Port of R limma's ``cameraPR.default``.
"""
if kwargs:
warnings.warn(f"Extra arguments disregarded: {list(kwargs.keys())}")
if isinstance(statistic, dict):
raise TypeError("statistic should be a numeric vector")
ID = None
stat_arr: np.ndarray
if isinstance(statistic, pd.Series):
ID = list(statistic.index)
stat_arr = statistic.values.astype(np.float64)
else:
stat_arr = np.asarray(statistic, dtype=np.float64)
if np.any(np.isnan(stat_arr)):
raise ValueError("NA values for statistic not allowed")
G = stat_arr.size
if G < 3:
raise ValueError("Too few genes in dataset: need at least 3")
if not isinstance(index, dict):
if (
isinstance(index, list)
and len(index)
and all(hasattr(x, "__len__") and not isinstance(x, str) for x in index)
):
index = {f"set{i + 1}": v for i, v in enumerate(index)}
else:
index = {"set1": index}
nsets = len(index)
# inter_gene_cor handling
igc = np.atleast_1d(np.asarray(inter_gene_cor, dtype=np.float64))
if np.any(np.isnan(igc)):
raise ValueError("NA inter.gene.cor not allowed")
if np.max(np.abs(igc)) >= 1:
raise ValueError("`inter.gene.cor` must be strictly between -1 and 1")
if igc.size > 1:
if igc.size != nsets:
raise ValueError("Length of `inter.gene.cor` doesn't match number of sets")
fixed_cor = False
igc_arr = igc
else:
fixed_cor = True
igc_arr = np.full(nsets, igc[0])
if not directional:
if np.min(stat_arr) < 0:
stat_arr = np.abs(stat_arr)
warnings.warn("Converting `statistic` to absolute values for non-directional tests")
if not use_ranks:
use_ranks = True
warnings.warn("Setting `use.ranks=TRUE` for non-directional tests")
df_camera = np.inf if use_ranks else G - 2
meanStat = float(np.mean(stat_arr))
varStat = float(np.var(stat_arr, ddof=1))
NGenes = np.zeros(nsets, dtype=np.int64)
Down = np.zeros(nsets)
Up = np.zeros(nsets)
names = list(index.keys())
for i, nm in enumerate(names):
iset = np.asarray(index[nm])
if iset.dtype.kind in ("U", "S", "O") and ID is not None:
iset = np.array([ID.index(s) for s in iset if s in ID], dtype=np.int64)
else:
iset = iset.astype(np.int64, copy=False)
StatInSet = stat_arr[iset]
m = iset.size
NGenes[i] = m
if use_ranks:
pv = rank_sum_test_with_correlation(
iset, statistics=stat_arr, correlation=igc_arr[i], df=df_camera
)
Down[i] = pv["less"]
Up[i] = pv["greater"]
else:
vif = 1 + (m - 1) * igc_arr[i]
m2 = G - m
meanStatInSet = float(np.mean(StatInSet))
delta = G / m2 * (meanStatInSet - meanStat)
varStatPooled = ((G - 1) * varStat - delta**2 * m * m2 / G) / (G - 2)
two_sample_t = delta / np.sqrt(varStatPooled * (vif / m + 1.0 / m2))
if np.isinf(df_camera):
Down[i] = stats.norm.cdf(two_sample_t)
Up[i] = stats.norm.sf(two_sample_t)
else:
Down[i] = stats.t.cdf(two_sample_t, df=df_camera)
Up[i] = stats.t.sf(two_sample_t, df=df_camera)
TwoSided = 2 * np.minimum(Down, Up)
if directional:
Direction = np.where(Down < Up, "Down", "Up")
if fixed_cor:
tab = pd.DataFrame(
{
"n_genes": NGenes,
"direction": Direction,
"p_value": TwoSided,
},
index=names,
)
else:
tab = pd.DataFrame(
{
"n_genes": NGenes,
"correlation": igc_arr,
"direction": Direction,
"p_value": TwoSided,
},
index=names,
)
else:
if fixed_cor:
tab = pd.DataFrame(
{"n_genes": NGenes, "p_value": Up},
index=names,
)
else:
tab = pd.DataFrame(
{"n_genes": NGenes, "correlation": igc_arr, "p_value": Up},
index=names,
)
if nsets > 1:
tab["fdr"] = p_adjust(tab["p_value"].values, method="BH")
if sort and nsets > 1:
o = np.argsort(tab["p_value"].values, kind="stable")
tab = tab.iloc[o]
return tab
# ---------------------------------------------------------------------------
# romer
# ---------------------------------------------------------------------------
def _mean_half(x: np.ndarray, n: int) -> tuple:
"""Return (top_half_mean, bottom_half_mean) matching R's .meanHalf."""
l = x.size
a = np.sort(x)
top = float(np.mean(a[:n]))
if l % 2 == 0:
bottom = float(np.mean(a[n:]))
else:
bottom = float(np.mean(a[n - 1 :]))
return top, bottom
[docs]
def romer(
y,
index,
design=None,
contrast=None,
array_weights: np.ndarray | None = None,
block=None,
correlation: float | None = None,
set_statistic: str = "mean",
nrot: int = 9999,
shrink_resid: bool = True,
rng=None,
**kwargs,
) -> pd.DataFrame:
"""
Rotation mean-rank gene-set enrichment analysis.
Port of R limma's ``romer.default``.
"""
if kwargs:
warnings.warn(f"Extra arguments disregarded: {list(kwargs.keys())}")
rng_state = _resolve_rng(rng)
ea = get_eawp(y)
y_mat = ea["exprs"].copy()
ngenes, n = y_mat.shape
if not isinstance(index, dict):
if (
isinstance(index, list)
and len(index)
and all(hasattr(x, "__len__") and not isinstance(x, str) for x in index)
):
index = {f"set{i + 1}": v for i, v in enumerate(index)}
else:
index = {"set": index}
nsets = len(index)
if nsets == 0:
raise ValueError("index is empty")
set_names = list(index.keys())
SetSizes = np.array(
[_resolve_set_index(index[nm], ngenes).size for nm in set_names],
dtype=np.int64,
)
if design is None:
raise ValueError("design matrix not specified")
design = np.asarray(design, dtype=np.float64)
if design.shape[0] != n:
raise ValueError("row dimension of design matrix must match column dimension of data")
ne = non_estimable(design)
if ne is not None:
print("Coefficients not estimable:", " ".join(ne))
p = design.shape[1]
if p < 2:
raise ValueError("design needs at least two columns")
d = n - p
if contrast is None:
contrast = p - 1
contrast_arr = np.atleast_1d(np.asarray(contrast))
if contrast_arr.size == 1 and np.issubdtype(contrast_arr.dtype, np.integer):
k = int(contrast_arr.item())
if k < p - 1:
keep = [j for j in range(p) if j != k]
design = np.column_stack([design[:, keep], design[:, k : k + 1]])
else:
contrast_vec = np.asarray(contrast, dtype=np.float64)
Q_c, R_c = linalg.qr(contrast_vec.reshape(-1, 1), mode="full")
rotated = design @ Q_c
if np.sign(R_c[0, 0]) < 0:
rotated[:, 0] = -rotated[:, 0]
design = np.column_stack([rotated[:, 1:], rotated[:, 0:1]])
if array_weights is not None:
aw = np.asarray(array_weights, dtype=np.float64)
if np.any(aw <= 0):
raise ValueError("array.weights must be positive")
if aw.size != n:
raise ValueError("Length of array.weights doesn't match number of array")
design = design * np.sqrt(aw)[:, np.newaxis]
y_mat = y_mat * np.sqrt(aw)
if block is not None:
if correlation is None:
raise ValueError("correlation must be set")
block_vec = np.asarray(block)
if block_vec.size != n:
raise ValueError("Length of block does not match number of arrays")
ub = np.unique(block_vec)
Z = (block_vec[:, None] == ub[None, :]).astype(np.float64)
cormatrix = Z @ (correlation * Z.T)
np.fill_diagonal(cormatrix, 1.0)
R_chol = linalg.cholesky(cormatrix, lower=False)
y_mat = linalg.solve_triangular(R_chol, y_mat.T, trans="T").T
design = linalg.solve_triangular(R_chol, design, trans="T")
Q, R_qr = linalg.qr(design, mode="full")
signc = float(np.sign(R_qr[p - 1, p - 1]))
effects = Q.T @ y_mat.T # (n, ngenes)
# s2 <- colMeans(effects[-(1:p),]^2)
s2 = np.mean(effects[p:, :] ** 2, axis=0)
sv = squeeze_var(s2, df=d)
d0 = sv["df_prior"]
s02 = sv["var_prior"]
sd_post = np.sqrt(sv["var_post"])
# Y <- effects[-(1:p0),,drop=FALSE] p0 = p-1
p0 = p - 1
Y = effects[p0:, :].copy() # (d+1, ngenes)
YY = np.sum(Y**2, axis=0)
B = Y[0, :]
modt = signc * B / sd_post
if shrink_resid:
from .utils import p_adjust as _p_adjust # noqa: F401 (not used)
from .utils import prop_true_null as _prop_true_null
p_value = 2 * stats.t.sf(np.abs(modt), df=d0 + d)
proportion = 1 - _prop_true_null(p_value)
stdev_unscaled = np.full(
ngenes,
1.0
/ abs(
R_qr[
int(np.sum(np.abs(np.diag(R_qr)) > 1e-10)) - 1,
int(np.sum(np.abs(np.diag(R_qr)) > 1e-10)) - 1,
]
),
)
var_unscaled = stdev_unscaled**2
df_total = np.full(ngenes, d) + sv["df_prior"]
stdev_coef_lim = (0.1, 4.0)
var_prior_lim = (
stdev_coef_lim[0] ** 2 / sv["var_prior"],
stdev_coef_lim[1] ** 2 / sv["var_prior"],
)
from .ebayes import _tmixture_vector
var_prior = _tmixture_vector(modt, stdev_unscaled, df_total, proportion, var_prior_lim)
if np.isnan(var_prior):
var_prior = 1.0 / sv["var_prior"]
warnings.warn("Estimation of var.prior failed - set to default value")
r = (var_unscaled + var_prior) / var_unscaled
if sv["df_prior"] > 1e6:
kernel = modt**2 * (1 - 1 / r) / 2
else:
kernel = (1 + df_total) / 2 * np.log((modt**2 + df_total) / (modt**2 / r + df_total))
lods = np.log(proportion / (1 - proportion)) - np.log(r) / 2 + kernel
ProbDE = np.exp(lods) / (1 + np.exp(lods))
Y[0, :] = Y[0, :] * np.sqrt(var_unscaled / (var_unscaled + var_prior * ProbDE))
if set_statistic not in ("mean", "floormean", "mean50"):
raise ValueError(
f"set.statistic '{set_statistic}' not recognized. "
"Must be 'mean', 'floormean', or 'mean50'."
)
# Pre-compute per-gene-set membership structure (AllIndices, Set)
all_indices = np.concatenate([_resolve_set_index(index[nm], ngenes) for nm in set_names])
Set_vec = np.concatenate(
[
np.full(_resolve_set_index(index[nm], ngenes).size, si, dtype=np.int64)
for si, nm in enumerate(set_names)
]
)
def _rank_r(x):
# R's rank() with default ties="average"
return stats.rankdata(x, method="average")
def _rowsum_mean(rank_mat):
# rank_mat shape (len(all_indices), ncol); group sum by Set_vec, divide
# by SetSizes.
if rank_mat.ndim == 1:
rank_mat = rank_mat.reshape(-1, 1)
out = np.zeros((nsets, rank_mat.shape[1]))
for c in range(rank_mat.shape[1]):
out[:, c] = np.bincount(Set_vec, weights=rank_mat[:, c], minlength=nsets)
out = out / SetSizes[:, np.newaxis]
return out
p_value = np.zeros((nsets, 3))
if set_statistic == "mean":
obs_ranks = np.column_stack(
[_rank_r(modt), ngenes - _rank_r(modt) + 1, _rank_r(np.abs(modt))]
)
obs_set_ranks = _rowsum_mean(obs_ranks[all_indices, :])
for _ in range(nrot):
R_vec = rng_state.standard_normal((1, d + 1))
R_vec = R_vec / np.sqrt(np.sum(R_vec**2))
Br = (R_vec @ Y).ravel()
s2r = (YY - Br**2) / d
if np.isfinite(d0):
sdr_post = np.sqrt((d0 * s02 + d * s2r) / (d0 + d))
else:
sdr_post = np.sqrt(s02)
modtr = signc * Br / sdr_post
rot_ranks = np.column_stack(
[_rank_r(modtr), ngenes - _rank_r(modtr) + 1, _rank_r(np.abs(modtr))]
)
rot_set_ranks = _rowsum_mean(rot_ranks[all_indices, :])
p_value += (rot_set_ranks >= obs_set_ranks).astype(np.float64)
elif set_statistic == "floormean":
obs_ranks = np.column_stack(
[
_rank_r(np.maximum(modt, 0)),
_rank_r(np.maximum(-modt, 0)),
_rank_r(np.maximum(np.abs(modt), 1)),
]
)
obs_set_ranks = _rowsum_mean(obs_ranks[all_indices, :])
for _ in range(nrot):
R_vec = rng_state.standard_normal((1, d + 1))
R_vec = R_vec / np.sqrt(np.sum(R_vec**2))
Br = (R_vec @ Y).ravel()
s2r = (YY - Br**2) / d
if np.isfinite(d0):
sdr_post = np.sqrt((d0 * s02 + d * s2r) / (d0 + d))
else:
sdr_post = np.sqrt(s02)
modtr = signc * Br / sdr_post
rot_ranks = np.column_stack(
[
_rank_r(np.maximum(modtr, 0)),
_rank_r(np.maximum(-modtr, 0)),
_rank_r(np.maximum(np.abs(modtr), 1)),
]
)
rot_set_ranks = _rowsum_mean(rot_ranks[all_indices, :])
p_value += (rot_set_ranks >= obs_set_ranks).astype(np.float64)
elif set_statistic == "mean50":
s_r = _rank_r(modt)
s_abs_r = _rank_r(np.abs(modt))
m_half = np.floor((SetSizes + 1) / 2).astype(np.int64)
s_rank_mixed = np.zeros(nsets)
s_rank_up = np.zeros(nsets)
s_rank_down = np.zeros(nsets)
for i, nm in enumerate(set_names):
iset = _resolve_set_index(index[nm], ngenes)
mh = _mean_half(s_r[iset], m_half[i])
s_rank_up[i] = mh[1]
s_rank_down[i] = mh[0]
s_rank_mixed[i] = _mean_half(s_abs_r[iset], m_half[i])[1]
for _ in range(nrot):
R_vec = rng_state.standard_normal((1, d + 1))
R_vec = R_vec / np.sqrt(np.sum(R_vec**2))
Br = (R_vec @ Y).ravel()
s2r = (YY - Br**2) / d
if np.isfinite(d0):
sdr_post = np.sqrt((d0 * s02 + d * s2r) / (d0 + d))
else:
sdr_post = np.sqrt(s02)
modtr = signc * Br / sdr_post
s_r2 = _rank_r(modtr)
s_abs_r2 = _rank_r(np.abs(modtr))
for j, nm in enumerate(set_names):
iset = _resolve_set_index(index[nm], ngenes)
mh2 = _mean_half(s_r2[iset], m_half[j])
s_up_2 = mh2[1]
s_down_2 = mh2[0]
s_mixed_2 = _mean_half(s_abs_r2[iset], m_half[j])[1]
if s_up_2 >= s_rank_up[j]:
p_value[j, 0] += 1
if s_down_2 <= s_rank_down[j]:
p_value[j, 1] += 1
if s_mixed_2 >= s_rank_mixed[j]:
p_value[j, 2] += 1
p_value = (p_value + 1) / (nrot + 1)
out = pd.DataFrame(
{
"n_genes": SetSizes,
"up": p_value[:, 0],
"down": p_value[:, 1],
"mixed": p_value[:, 2],
},
index=set_names,
)
return out
# ---------------------------------------------------------------------------
# gene_set_test / rank_sum_test_with_correlation
# ---------------------------------------------------------------------------
[docs]
def rank_sum_test_with_correlation(
index,
statistics: np.ndarray,
correlation: float = 0.0,
df: float = np.inf,
) -> dict:
"""
Wilcoxon rank-sum test with an inter-gene correlation adjustment.
Port of R limma's ``rankSumTestWithCorrelation``.
Returns
-------
dict
``{"less": p_lower_tail, "greater": p_upper_tail}`` matching R's
named vector.
"""
stats_arr = np.asarray(statistics, dtype=np.float64)
n = stats_arr.size
r = stats.rankdata(stats_arr, method="average")
idx = np.asarray(index, dtype=np.int64)
r1 = r[idx]
n1 = r1.size
n2 = n - n1
U = n1 * n2 + n1 * (n1 + 1) / 2.0 - float(np.sum(r1))
mu = n1 * n2 / 2.0
if correlation == 0 or n1 == 1:
sigma2 = n1 * n2 * (n + 1) / 12.0
else:
sigma2 = (
np.arcsin(1.0) * n1 * n2
+ np.arcsin(0.5) * n1 * n2 * (n2 - 1)
+ np.arcsin(correlation / 2.0) * n1 * (n1 - 1) * n2 * (n2 - 1)
+ np.arcsin((correlation + 1) / 2.0) * n1 * (n1 - 1) * n2
)
sigma2 = sigma2 / 2.0 / np.pi
ties = len(np.unique(r)) != n
if ties:
_, counts = np.unique(r, return_counts=True)
counts = counts.astype(np.float64)
adjustment = float(np.sum(counts * (counts + 1) * (counts - 1)) / (n * (n + 1) * (n - 1)))
sigma2 = sigma2 * (1 - adjustment)
zlower = (U + 0.5 - mu) / np.sqrt(sigma2)
zupper = (U - 0.5 - mu) / np.sqrt(sigma2)
if np.isinf(df):
p_less = stats.norm.sf(zupper)
p_greater = stats.norm.cdf(zlower)
else:
p_less = stats.t.sf(zupper, df=df)
p_greater = stats.t.cdf(zlower, df=df)
return {"less": float(p_less), "greater": float(p_greater)}
[docs]
def gene_set_test( # noqa: A002 (`type` kwarg shadows builtin; deliberate R parity)
index,
statistics: np.ndarray,
alternative: str = "mixed",
type: str = "auto",
ranks_only: bool = True,
nsim: int = 9999,
rng=None,
) -> float:
"""
Competitive gene-set test.
Port of R limma's ``geneSetTest``. When ``ranks_only=True`` the
p-value is obtained analytically via
:func:`rank_sum_test_with_correlation`; otherwise a permutation
distribution is simulated using ``rng``.
The ``type`` keyword shadows the Python builtin `type`; this is
deliberate for R-signature parity (limma's ``geneSetTest(type=...)``).
The shadow is local to this function and the body never invokes
``type()``; callers keyword-passing ``type=...`` works as expected.
"""
alt_map = {
"two.sided": "either",
"less": "down",
"greater": "up",
}
if alternative not in ("mixed", "either", "down", "up", "less", "greater", "two.sided"):
raise ValueError(
f"alternative '{alternative}' not recognized. "
"Must be one of 'mixed', 'either', 'down', 'up', "
"'less', 'greater', 'two.sided'."
)
alternative = alt_map.get(alternative, alternative)
type_lower = type.lower()
if type_lower not in ("auto", "t", "f"):
raise ValueError(f"type '{type}' not recognized. Must be 'auto', 't' or 'f'.")
stats_arr = np.asarray(statistics, dtype=np.float64)
allsamesign = bool(np.all(stats_arr >= 0) or np.all(stats_arr <= 0))
if type_lower == "auto":
type_lower = "f" if allsamesign else "t"
if type_lower == "f" and alternative != "mixed":
raise ValueError('Only alternative="mixed" is possible with F-like statistics.')
if alternative == "mixed":
stats_arr = np.abs(stats_arr)
if alternative == "down":
stats_arr = -stats_arr
alternative = "up"
idx = np.asarray(index, dtype=np.int64)
if ranks_only:
pvals = rank_sum_test_with_correlation(index=idx, statistics=stats_arr, df=np.inf)
if alternative == "down":
return float(pvals["less"])
if alternative == "up":
return float(pvals["greater"])
if alternative == "either":
return float(2.0 * min(pvals["less"], pvals["greater"]))
# "mixed"
return float(pvals["greater"])
else:
rng_state = _resolve_rng(rng)
ssel = stats_arr[idx]
ssel = ssel[~np.isnan(ssel)]
nsel = ssel.size
if nsel == 0:
return 1.0
stat_all = stats_arr[~np.isnan(stats_arr)]
msel = float(np.mean(ssel))
if alternative == "either":
def posstat(x):
return abs(x)
else:
def posstat(x):
return x
msel = posstat(msel)
ntail = 1
for _ in range(nsim):
sample = rng_state.choice(stat_all, size=nsel, replace=False)
if posstat(float(np.mean(sample))) >= msel:
ntail += 1
return float(ntail) / (nsim + 1)
def wilcox_gst(index, statistics, **kwargs) -> float:
"""
Mean-rank gene-set test using a Wilcoxon / rank-sum test.
Port of R limma's ``wilcoxGST`` (``geneset-wilcox.R``). Thin
wrapper around :func:`gene_set_test` with ``ranks_only=True``.
"""
kwargs.pop("ranks_only", None)
return gene_set_test(index=index, statistics=statistics, ranks_only=True, **kwargs)
def top_romer(x, n: int = 10, alternative: str = "up"):
"""
Extract the top gene sets from :func:`romer` output.
Port of R limma's ``topRomer`` (``geneset-romer.R``). ``x`` is
expected to be a DataFrame (or similar tabular object) with
``Up`` / ``Down`` / ``Mixed`` / ``NGenes`` columns, matching
:func:`romer`'s return schema.
"""
import pandas as pd
if not isinstance(x, pd.DataFrame):
x = pd.DataFrame(x)
alternative = alternative.lower()
if alternative not in ("up", "down", "mixed"):
raise ValueError("alternative must be 'up', 'down', or 'mixed'")
n = min(n, x.shape[0])
# R's order() is stable; mergesort is the numpy equivalent.
if alternative == "up":
sort_keys = [x["Up"].values, x["Mixed"].values, -x["NGenes"].values]
elif alternative == "down":
sort_keys = [x["Down"].values, x["Mixed"].values, -x["NGenes"].values]
else: # mixed
sort_keys = [
x["Mixed"].values,
np.minimum(x["Up"].values, x["Down"].values),
-x["NGenes"].values,
]
# lexsort sorts by last key primarily; invert order so first key has highest priority.
order = np.lexsort(tuple(reversed(sort_keys)))
return x.iloc[order[:n], :]