# SPDX-License-Identifier: GPL-3.0-or-later
#
# This module is a Python port of code from R limma. Original R copyrights:
# voom.R Copyright (C) 2011-2026 Gordon Smyth, Charity Law
# vooma.R Copyright (C) 2012-2024 Gordon Smyth, Charity Law,
# Mengbo Li
# voomaLmFit.R Copyright (C) 2023-2024 Mengbo Li, Gordon Smyth
# voomWithQualityWeights.R Copyright (C) 2014-2025 Matt Ritchie, Cynthia Liu,
# Gordon Smyth
# Python port: Copyright (C) 2026 John Mulvey
"""
voom: Transform RNA-seq counts for linear modelling.
Implements voom and related functions from R limma for RNA-seq analysis:
- voom(): Transform counts to log-CPM with precision weights
- voom_with_quality_weights(): voom combined with sample quality weights
- vooma(): voom-like weights for non-count expression data
"""
from __future__ import annotations
import warnings
import numpy as np
from scipy import interpolate, linalg
from statsmodels.nonparametric.smoothers_lowess import lowess as sm_lowess
from .classes import EList, _is_anndata, get_eawp, put_eawp
from .lmfit import _parse_design, lm_fit
from .utils import choose_lowess_span
def _draw_voom_trend(
sx: np.ndarray,
sy: np.ndarray,
x_line: np.ndarray,
y_line: np.ndarray,
*,
xlab: str,
ylab: str,
title: str,
ax=None,
) -> None:
"""Render a voom/vooma mean-variance trend plot via matplotlib.
Lazy import matches the pattern used by ``qqt``/``qqf`` in
``pylimma/utils.py``: matplotlib is an optional dependency.
"""
try:
import matplotlib.pyplot as plt
except ImportError:
import warnings
warnings.warn("matplotlib not available for plotting")
return
if ax is None:
_fig, ax = plt.subplots()
ax.scatter(sx, sy, s=2, c="black")
ax.plot(x_line, y_line, color="red")
ax.set_xlabel(xlab)
ax.set_ylabel(ylab)
ax.set_title(title)
def _draw_array_weights_bar(aw: np.ndarray, ax=None, col=None) -> None:
"""Bar plot of sample-specific array weights (matches voomWithQualityWeights).
``col`` forwards R's ``col`` argument to matplotlib's bar ``color``
keyword so ``voomWithQualityWeights(counts, design, col="red")`` in
R translates directly.
"""
try:
import matplotlib.pyplot as plt
except ImportError:
import warnings
warnings.warn("matplotlib not available for plotting")
return
if ax is None:
_fig, ax = plt.subplots()
bar_kwargs = {}
if col is not None:
bar_kwargs["color"] = col
ax.bar(np.arange(1, len(aw) + 1), aw, **bar_kwargs)
ax.axhline(1.0, color="red", linestyle="--")
ax.set_xlabel("Sample")
ax.set_ylabel("Weight")
ax.set_title("Sample-specific weights")
def _normalize_between_arrays(y: np.ndarray, method: str = "none") -> np.ndarray:
"""Thin dispatcher onto :func:`pylimma.normalize.normalize_between_arrays`."""
from .normalize import normalize_between_arrays
return normalize_between_arrays(y, method=method)
[docs]
def voom(
counts,
design: np.ndarray | None = None,
lib_size: np.ndarray | None = None,
offset: np.ndarray | None = None,
offset_prior: np.ndarray | None = None,
normalize_method: str = "none",
block: np.ndarray | None = None,
correlation: float | None = None,
weights: np.ndarray | None = None,
span: float = 0.5,
adaptive_span: bool = True,
plot: bool = False,
save_plot: bool = False,
*,
out_layer: str = "voom_E",
weights_layer: str = "voom_weights",
key: str = "voom",
layer: str | None = None,
):
"""
Transform RNA-seq counts for linear modelling with mean-variance weighting.
Transforms count data to log2-counts per million (log-CPM), estimates the
mean-variance relationship, and computes observation weights that can be
used in weighted linear models via lm_fit().
Parameters
----------
counts : ndarray
Matrix of counts, shape (n_genes, n_samples). Must be non-negative
with no NA values.
design : ndarray, optional
Design matrix, shape (n_samples, n_coefficients). If None, uses an
intercept-only model.
lib_size : ndarray, optional
Library sizes for each sample. If None, computed as column sums of
counts.
offset : ndarray, optional
Offset matrix, shape (n_genes, n_samples). If provided without
offset_prior, offset_prior is computed as offset - rowMeans(offset).
offset_prior : ndarray, optional
Pre-centered offset matrix, shape (n_genes, n_samples). Applied as:
lib_size_matrix = exp(log(lib_size_matrix) + offset_prior).
Takes precedence over offset if both are provided.
normalize_method : str, default "none"
Normalization method. Currently only "none" is supported.
block : ndarray, optional
Factor indicating blocking structure for samples.
correlation : float, optional
Intra-block correlation (required if block is specified).
weights : ndarray, optional
Prior weights for samples or observations.
span : float, default 0.5
LOWESS span for trend fitting (used if adaptive_span=False).
adaptive_span : bool, default True
If True, choose span adaptively based on number of genes.
save_plot : bool, default False
If True, include trend data in output for plotting.
Returns
-------
dict
E : ndarray
Log2-CPM expression matrix, shape (n_genes, n_samples).
weights : ndarray
Precision weights, shape (n_genes, n_samples).
design : ndarray
Design matrix used for fitting.
lib_size : ndarray
Library sizes.
span : float, optional
LOWESS span used (only if adaptive_span=True).
voom_xy : dict, optional
Trend data for plotting (only if save_plot=True).
voom_line : dict, optional
LOWESS fit for plotting (only if save_plot=True).
offset_prior : ndarray, optional
The offset_prior matrix used (only if offset or offset_prior provided).
Notes
-----
The voom method [1]_ transforms count data to log2-CPM, then estimates
the mean-variance trend from the residual standard deviations of a
preliminary linear model fit. The trend is used to compute precision
weights for each observation.
References
----------
.. [1] Law CW, Chen Y, Shi W, Smyth GK (2014). voom: precision weights
unlock linear model analysis tools for RNA-seq read counts.
Genome Biology 15:R29.
"""
# Polymorphic input: ndarray / dict / EList / AnnData (R limma's voom.EList
# and .default branches are collapsed here via get_eawp).
original_input = counts
eawp = get_eawp(counts, layer=layer)
counts = np.asarray(eawp["exprs"], dtype=np.float64)
# EList-specific warn-and-proceed: R's voom calls as.matrix(EList)
# (voom.R:32) which drops every slot except E. pylimma keeps the more
# useful behaviour of honouring EList['design'] and EList['weights']
# but warns so users porting R code know the divergence is there.
# See known_diff_voom_elist_warning.md for rationale.
_input_is_elist = isinstance(original_input, EList)
if design is None and eawp.get("design") is not None:
if _input_is_elist:
warnings.warn(
"pylimma's voom is using 'design' from the EList's design "
"slot. R's voom would silently discard it via "
"as.matrix(EList). To match R behaviour, pass "
"design=el['design'] explicitly (or clear the slot).",
UserWarning,
)
design = eawp["design"]
if weights is None and eawp.get("weights") is not None:
if _input_is_elist:
warnings.warn(
"pylimma's voom is using prior weights from the EList's "
"weights slot. R's voom would silently discard them via "
"as.matrix(EList). To match R, pass "
"weights=el['weights'] explicitly.",
UserWarning,
)
weights = np.asarray(eawp["weights"], dtype=np.float64)
# Check counts
n_genes, n_samples = counts.shape
if n_genes < 2:
raise ValueError("Need at least two genes to fit a mean-variance trend")
if np.any(np.isnan(counts)):
raise ValueError("NA counts not allowed")
if np.min(counts) < 0:
raise ValueError("Negative counts not allowed")
# Parse design. Handles formula strings (via patsy + adata.obs when
# input is AnnData), ndarray / DataFrame / patsy DesignMatrix, or
# None (intercept-only). Matches lm_fit's dispatch.
sample_data = original_input.obs if _is_anndata(original_input) else None
design, _ = _parse_design(design, data=sample_data, n_samples=n_samples)
# Check lib_size
if lib_size is None:
lib_size = np.sum(counts, axis=0)
lib_size = np.asarray(lib_size, dtype=np.float64)
lib_size_matrix = np.broadcast_to(lib_size, (n_genes, n_samples)).copy()
# Handle offset parameters
if offset is not None:
offset = np.asarray(offset, dtype=np.float64)
if offset.shape != counts.shape:
raise ValueError("counts and offset must have equal dimensions.")
if offset_prior is None:
offset_prior = offset - np.mean(offset, axis=1, keepdims=True)
else:
warnings.warn("Ignoring offset in favor of offset_prior. Should not set both.")
offset = None
if offset_prior is not None:
offset_prior = np.asarray(offset_prior, dtype=np.float64)
if offset_prior.shape != counts.shape:
raise ValueError("counts and offset_prior must have equal dimensions.")
lib_size_matrix = np.exp(np.log(lib_size_matrix) + offset_prior)
# Choose span based on number of genes
if adaptive_span:
span = choose_lowess_span(n_genes, small_n=50, min_span=0.3, power=1 / 3)
# Compute log2-counts-per-million
# y = log2((counts+0.5)/(lib.size.matrix+1)*1e6)
y = np.log2((counts + 0.5) / (lib_size_matrix + 1) * 1e6)
y = _normalize_between_arrays(y, method=normalize_method)
# Fit linear model
fit = lm_fit(y, design, block=block, correlation=correlation, weights=weights)
# Compute Amean if not present
if fit.get("Amean") is None:
fit["Amean"] = np.nanmean(y, axis=1)
# If no replication found, set all weights to 1
n_with_reps = np.sum(fit["df_residual"] > 0)
if n_with_reps < 2:
if n_with_reps == 0:
warnings.warn("The experimental design has no replication. Setting weights to 1.")
elif n_with_reps == 1:
warnings.warn("Only one gene with any replication. Setting weights to 1.")
return put_eawp(
{
"E": y,
"weights": np.ones_like(y),
"design": design,
"lib_size": lib_size,
"targets": {"lib_size": lib_size},
},
original_input,
out_layer=out_layer,
weights_layer=weights_layer,
uns_key=key,
)
# Fit lowess trend to sqrt-standard-deviations by log-count-size
# sx = fit$Amean + mean(log2(lib.size+1)) - log2(1e6)
sx = fit["Amean"] + np.mean(np.log2(lib_size + 1)) - np.log2(1e6)
sy = np.sqrt(fit["sigma"])
# Exclude all-zero genes
all_zero = np.sum(counts, axis=1) == 0
if np.any(all_zero):
sx_fit = sx[~all_zero]
sy_fit = sy[~all_zero]
else:
sx_fit = sx
sy_fit = sy
# Fit LOWESS trend using statsmodels (matches R's lowess closely)
# it=3 matches R's default of 3 robustifying iterations
# delta=0.01*range(x) matches R's default delta parameter
x_range = sx_fit.max() - sx_fit.min()
lowess_result = sm_lowess(
sy_fit, sx_fit, frac=span, it=3, delta=0.01 * x_range, return_sorted=True
)
x_sorted = lowess_result[:, 0]
y_sorted = lowess_result[:, 1]
# approxfun with rule=2 extrapolates using boundary values
f = interpolate.interp1d(
x_sorted,
y_sorted,
kind="linear",
bounds_error=False,
fill_value=(y_sorted[0], y_sorted[-1]),
)
# Compute fitted values from linear model
# fitted.values = coefficients %*% t(design)
coefficients = fit["coefficients"]
rank = fit["rank"]
if rank < design.shape[1]:
pivot = fit["pivot"]
j = pivot[:rank]
fitted_values = coefficients[:, j] @ design[:, j].T
else:
fitted_values = coefficients @ design.T
# Convert to fitted counts
# fitted.cpm = 2^fitted.values
# fitted.count = 1e-6 * fitted.cpm * (lib.size.matrix+1)
# fitted.logcount = log2(fitted.count)
fitted_cpm = 2**fitted_values
fitted_count = 1e-6 * fitted_cpm * (lib_size_matrix + 1)
fitted_logcount = np.log2(fitted_count)
# Apply trend to individual observations
# w = 1/f(fitted.logcount)^4
trend_values = f(fitted_logcount)
w = 1 / trend_values**4
# Build output
out = {
"E": y,
"weights": w,
"design": design,
"lib_size": lib_size,
}
if adaptive_span:
out["span"] = span
if plot:
_draw_voom_trend(
sx_fit,
sy_fit,
x_sorted,
y_sorted,
xlab="log2( count size + 0.5 )",
ylab="Sqrt( standard deviation )",
title="voom: Mean-variance trend",
)
if save_plot:
out["voom_xy"] = {
"x": sx_fit,
"y": sy_fit,
"xlab": "log2( count size + 0.5 )",
"ylab": "Sqrt( standard deviation )",
}
out["voom_line"] = {
"x": x_sorted,
"y": y_sorted,
}
if offset_prior is not None:
out["offset_prior"] = offset_prior
return put_eawp(
out,
original_input,
out_layer=out_layer,
weights_layer=weights_layer,
uns_key=key,
)
[docs]
def voom_with_quality_weights(
counts,
design: np.ndarray | None = None,
lib_size: np.ndarray | None = None,
normalize_method: str = "none",
plot: bool = False,
span: float = 0.5,
adaptive_span: bool = True,
var_design: np.ndarray | None = None,
var_group: np.ndarray | None = None,
method: str = "genebygene",
maxiter: int = 50,
tol: float = 1e-5,
trace: bool = False,
col=None,
*,
out_layer: str = "voom_E",
weights_layer: str = "voom_weights",
key: str = "voom",
layer: str | None = None,
**voom_kwargs,
):
"""
voom transformation with sample-specific quality weights.
Combines the voom mean-variance modelling with sample-specific quality
weights estimated by array_weights(). This can improve power when some
samples have higher technical variability than others.
Parameters
----------
counts : ndarray
Matrix of counts, shape (n_genes, n_samples).
design : ndarray, optional
Design matrix. If None, uses an intercept-only model.
lib_size : ndarray, optional
Library sizes. If None, computed as column sums.
normalize_method : str, default "none"
Normalization method (passed to voom).
var_design : ndarray, optional
Design matrix for variance model.
var_group : ndarray, optional
Factor defining variance groups.
method : str, default "genebygene"
Method for array weights estimation.
maxiter : int, default 50
Maximum iterations for array weights.
tol : float, default 1e-5
Convergence tolerance for array weights.
trace : bool, default False
If True, print iteration progress to stdout during the second
array_weights() estimation (matches R behaviour).
span : float, default 0.5
LOWESS span (used if adaptive_span=False).
adaptive_span : bool, default True
If True, choose span adaptively.
Returns
-------
dict
Same keys as :func:`voom`, with one addition:
sample_weights : ndarray
Per-sample quality weights.
Notes
-----
The R `col` argument (bar colour for the array-weight plot) is not
exposed; matplotlib defaults are used when ``plot=True``.
See Also
--------
voom : Basic voom transformation.
array_weights : Estimate sample quality weights.
"""
# Import here to avoid circular import
from .weights import array_weights
# Extract raw counts once so internal voom calls work on plain ndarray and
# the final put_eawp packages the result based on the ORIGINAL input class.
original_input = counts
eawp = get_eawp(counts, layer=layer)
counts_arr = np.asarray(eawp["exprs"], dtype=np.float64)
# EList design warning: R's voomWithQualityWeights passes the EList
# into its internal voom (voomWithQualityWeights.R:13,19) which then
# hits as.matrix(EList) and drops all slots. pylimma picks design up
# here before the inner voom sees an ndarray, so we warn for parity
# with the R behaviour. Weights are not handled here (neither side
# uses y$weights in voomWithQualityWeights).
if design is None and eawp.get("design") is not None:
if isinstance(original_input, EList):
warnings.warn(
"pylimma's voom_with_quality_weights is using 'design' "
"from the EList's design slot. R's "
"voomWithQualityWeights would silently discard it via "
"as.matrix(EList). To match R behaviour, pass "
"design=el['design'] explicitly.",
UserWarning,
)
design = eawp["design"]
# Parse design once, up front. Inner voom calls are fed counts_arr
# (plain ndarray), so formula strings must be resolved here -
# otherwise the inner calls lose the adata.obs context needed by patsy.
sample_data = original_input.obs if _is_anndata(original_input) else None
n_samples = counts_arr.shape[1]
design, _ = _parse_design(design, data=sample_data, n_samples=n_samples)
# Initial voom without array weights
v = voom(
counts_arr,
design=design,
lib_size=lib_size,
normalize_method=normalize_method,
span=span,
adaptive_span=adaptive_span,
**voom_kwargs,
)
# Estimate array weights
aw = array_weights(
v,
design=design,
var_design=var_design,
var_group=var_group,
method=method,
maxiter=maxiter,
tol=tol,
)
# Re-run voom with array weights, drawing the trend if plot=True
v = voom(
counts_arr,
design=design,
weights=aw,
lib_size=lib_size,
normalize_method=normalize_method,
span=span,
adaptive_span=adaptive_span,
plot=plot,
**voom_kwargs,
)
# Update array weights with new voom output (matches R: trace only here)
aw = array_weights(
v,
design=design,
var_design=var_design,
var_group=var_group,
method=method,
maxiter=maxiter,
tol=tol,
trace=trace,
)
# Incorporate array weights into voom weights
# v$weights <- t(aw * t(v$weights))
v["weights"] = v["weights"] * aw[np.newaxis, :]
v["sample_weights"] = aw
if plot:
_draw_array_weights_bar(aw, col=col)
return put_eawp(
dict(v),
original_input,
out_layer=out_layer,
weights_layer=weights_layer,
uns_key=key,
)
[docs]
def vooma(
y,
design: np.ndarray | None = None,
block: np.ndarray | None = None,
correlation: float | None = None,
predictor: np.ndarray | None = None,
span: float | None = None,
legacy_span: bool = False,
plot: bool = False,
save_plot: bool = False,
*,
out_layer: str = "vooma_E",
weights_layer: str = "vooma_weights",
key: str = "vooma",
layer: str | None = None,
):
"""
voom-like weights for non-count expression data.
Similar to voom but for continuous log-expression data (e.g., microarray).
Estimates the mean-variance relationship and computes observation weights.
Parameters
----------
y : ndarray
Expression matrix (log-scale), shape (n_genes, n_samples).
design : ndarray, optional
Design matrix. If None, uses an intercept-only model.
block : ndarray, optional
Factor indicating blocking structure.
correlation : float, optional
Intra-block correlation (required if block is specified).
predictor : ndarray, optional
Precision predictor, shape (n_genes,) or (n_genes, n_samples). When
given, the variance trend is fitted against a linear combination of
average log-expression and the row-mean predictor, and sample-specific
weights are derived from the predictor.
span : float, optional
LOWESS span. If None, chosen adaptively.
legacy_span : bool, default False
If True, use the legacy adaptive-span rule
(small_n=10, power=0.5); otherwise use small_n=50, power=1/3.
Ignored if `span` is given.
save_plot : bool, default False
If True, include trend data in output.
Returns
-------
dict
E : ndarray
Expression matrix (same as input y).
weights : ndarray
Precision weights, shape (n_genes, n_samples).
design : ndarray
Design matrix.
span : float
LOWESS span used.
voom_xy : dict, optional
Trend data (only if save_plot=True).
voom_line : dict, optional
LOWESS fit (only if save_plot=True).
"""
# Polymorphic input dispatch (R: vooma.EList and .default branches).
original_input = y
eawp = get_eawp(y, layer=layer)
y = np.asarray(eawp["exprs"], dtype=np.float64)
if design is None and eawp.get("design") is not None:
design = eawp["design"]
n_genes, n_samples = y.shape
# Parse design (formula strings via patsy + adata.obs when AnnData).
sample_data = original_input.obs if _is_anndata(original_input) else None
design, _ = _parse_design(design, data=sample_data, n_samples=n_samples)
# Compute row means
A = np.nanmean(y, axis=1)
if np.any(np.isnan(A)):
raise ValueError("y contains entirely NA rows")
# Validate predictor (matches R's shape + NA handling)
if predictor is not None:
predictor = np.asarray(predictor, dtype=np.float64)
if predictor.ndim == 1:
if predictor.shape[0] != n_genes:
raise ValueError("predictor is of wrong dimension")
predictor = np.broadcast_to(predictor[:, np.newaxis], (n_genes, n_samples)).copy()
elif predictor.ndim == 2:
if predictor.shape[0] != n_genes:
raise ValueError("predictor is of wrong dimension")
if predictor.shape[1] == 1:
predictor = np.broadcast_to(predictor, (n_genes, n_samples)).copy()
elif predictor.shape[1] != n_samples:
raise ValueError("predictor is of wrong dimension")
else:
raise ValueError("predictor is of wrong dimension")
if np.any(np.isnan(predictor)):
y_has_na = np.any(np.isnan(y))
if y_has_na:
if np.any(np.isnan(predictor[~np.isnan(y)])):
raise ValueError("All observed y values must have non-NA predictors")
else:
raise ValueError("All observed y values must have non-NA predictors")
# Fit linear model
if block is None:
# Simple OLS fit
from scipy import linalg
q, r, pivot = linalg.qr(design, pivoting=True)
rank = np.sum(np.abs(np.diag(r)) > 1e-10)
qty = q.T @ y.T
fitted = (q[:, :rank] @ qty[:rank, :]).T
# Residual variance
residual_effects = qty[rank:, :]
s2 = np.mean(residual_effects**2, axis=0)
else:
# With block correlation
block = np.asarray(block)
if len(block) != n_samples:
raise ValueError("Length of block does not match number of arrays")
if correlation is None:
raise ValueError(
"correlation must be specified when block is provided. "
"Use duplicate_correlation() to estimate intra-block correlation."
)
unique_blocks = np.unique(block)
n_blocks = len(unique_blocks)
# Build block indicator matrix Z
Z = np.zeros((n_samples, n_blocks))
for i, ub in enumerate(unique_blocks):
Z[:, i] = (block == ub).astype(float)
# Correlation matrix
cormatrix = Z @ (correlation * Z.T)
np.fill_diagonal(cormatrix, 1.0)
# Cholesky decomposition
from scipy import linalg
chol_v = linalg.cholesky(cormatrix, lower=False)
# Transform data and design
z = linalg.solve_triangular(chol_v, y.T, trans="T")
X = linalg.solve_triangular(chol_v, design, trans="T")
# Fit
q, r, pivot = linalg.qr(X, pivoting=True)
rank = np.sum(np.abs(np.diag(r)) > 1e-10)
qtz = q.T @ z
fitted_transformed = q[:, :rank] @ qtz[:rank, :]
fitted = (chol_v.T @ fitted_transformed).T
# Residual variance
residual_effects = qtz[rank:, :]
s2 = np.mean(residual_effects**2, axis=0)
# Prepare for trend fitting
sx = A
sy = np.sqrt(np.sqrt(s2)) # Quarter-root variance = sqrt(sqrt(s2))
mu = fitted
# Optionally combine ave log intensity with precision predictor.
# R: vartrend <- lm.fit(cbind(1, sx, sxc), sy); sx <- vartrend$fitted.values;
# mu <- beta[1] + beta[2]*mu + beta[3]*predictor
if predictor is not None:
sxc = np.nanmean(predictor, axis=1)
vartrend_design = np.column_stack([np.ones(n_genes), sx, sxc])
beta, *_ = linalg.lstsq(vartrend_design, sy)
sx = vartrend_design @ beta
mu = beta[0] + beta[1] * mu + beta[2] * predictor
xlab = "Combined predictor"
else:
xlab = "Average log-expression"
# Choose span (legacy vs default adaptive rule)
if span is None:
if legacy_span:
span = choose_lowess_span(n_genes, small_n=10, min_span=0.3, power=0.5)
else:
span = choose_lowess_span(n_genes, small_n=50, min_span=0.3, power=1 / 3)
# Fit LOWESS trend using statsmodels (matches R's lowess closely)
# delta=0.01*range(x) matches R's default delta parameter
x_range = sx.max() - sx.min()
lowess_result = sm_lowess(sy, sx, frac=span, it=3, delta=0.01 * x_range, return_sorted=True)
x_sorted = lowess_result[:, 0]
y_sorted = lowess_result[:, 1]
f = interpolate.interp1d(
x_sorted,
y_sorted,
kind="linear",
bounds_error=False,
fill_value=(y_sorted[0], y_sorted[-1]),
)
# Compute weights from fitted values (mu)
w = 1 / f(mu) ** 4
# Build output
out = {
"E": y,
"weights": w,
"design": design,
"span": span,
}
if plot:
_draw_voom_trend(
sx,
sy,
x_sorted,
y_sorted,
xlab=xlab,
ylab="Sqrt( standard deviation )",
title=(
"vooma variance trend" if predictor is not None else "vooma mean-variance trend"
),
)
if save_plot:
out["voom_xy"] = {
"x": sx,
"y": sy,
"xlab": xlab,
"ylab": "Sqrt( standard deviation )",
}
out["voom_line"] = {
"x": x_sorted,
"y": y_sorted,
}
return put_eawp(
out,
original_input,
out_layer=out_layer,
weights_layer=weights_layer,
uns_key=key,
)
[docs]
def vooma_lm_fit(
y,
design=None,
prior_weights: np.ndarray | None = None,
block: np.ndarray | None = None,
sample_weights: bool = False,
var_design: np.ndarray | None = None,
var_group: np.ndarray | None = None,
prior_n: float = 10,
predictor: np.ndarray | None = None,
span: float | None = None,
legacy_span: bool = False,
plot: bool = False,
save_plot: bool = False,
keep_elist: bool = True,
*,
key: str = "pylimma",
voom_key: str = "vooma",
weights_layer: str = "vooma_weights",
layer: str | None = None,
) -> dict | None:
"""
Combined vooma + lmFit with iterative refinement.
Applies vooma-style mean-variance modelling to expression data, then fits
a linear model. If block correlation or sample weights are requested,
performs one iteration to refine the estimates.
Parameters
----------
y : ndarray
Expression matrix (log-scale), shape (n_genes, n_samples).
design : ndarray, optional
Design matrix. If None, uses an intercept-only model.
prior_weights : ndarray, optional
Prior observation weights. Cannot be combined with sample_weights.
block : ndarray, optional
Block factor for correlated samples.
sample_weights : bool, default False
If True, estimate sample-specific quality weights.
var_design : ndarray, optional
Design matrix for variance model (used with sample_weights).
var_group : ndarray, optional
Factor defining variance groups (used with sample_weights).
prior_n : float, default 10
Prior sample size for array weights estimation.
span : float, optional
LOWESS span. If None, chosen adaptively.
legacy_span : bool, default False
If True, use legacy span selection algorithm.
save_plot : bool, default False
If True, include trend data in output.
keep_elist : bool, default True
If True, include expression data with weights in output (dict
return path only; AnnData input always writes the weights layer
regardless).
key : str, default "pylimma"
AnnData ``adata.uns`` key for the fit slots (mirrors
:func:`lm_fit`).
voom_key : str, default "vooma"
AnnData ``adata.uns`` key for ancillary metadata (span,
sample_weights, voom_xy, voom_line). Keeps the fit and the
voom-like ancillaries in separate uns buckets, matching the
``vooma()`` + ``lm_fit()`` split.
weights_layer : str, default "vooma_weights"
AnnData output layer for the computed observation weights.
layer : str, optional
AnnData input layer to read expression from. Defaults to
``adata.X``.
Returns
-------
dict or None
For AnnData input: mutates adata (weights layer, fit in
``adata.uns[key]``, ancillaries in ``adata.uns[voom_key]``) and
returns ``None``.
For ndarray / EList / dict input: returns a dict with the fit
plus span, targets (if requested), EList (if keep_elist), and
voom_xy / voom_line (if save_plot).
Notes
-----
This function combines vooma() and lm_fit() with optional iterative
refinement of sample weights and intra-block correlation.
"""
from .classes import as_matrix_weights
from .dups import duplicate_correlation
from .lmfit import lm_fit
from .weights import array_weights
# Polymorphic input dispatch (ndarray / dict / EList / AnnData),
# mirroring voom / vooma. Pull design and prior_weights off the
# input when the caller left them unset.
original_input = y
eawp = get_eawp(y, layer=layer)
y = np.asarray(eawp["exprs"], dtype=np.float64)
if design is None and eawp.get("design") is not None:
design = eawp["design"]
if prior_weights is None and eawp.get("weights") is not None:
prior_weights = np.asarray(eawp["weights"], dtype=np.float64)
n_genes, n_samples = y.shape
if n_samples < 2:
raise ValueError("Too few samples")
if n_genes < 2:
raise ValueError("Need multiple rows")
# Compute row means
A = np.nanmean(y, axis=1)
if np.any(np.isnan(A)):
raise ValueError("y contains entirely NA rows")
# Validate predictor (mirrors vooma())
if predictor is not None:
predictor = np.asarray(predictor, dtype=np.float64)
if predictor.ndim == 1:
if predictor.shape[0] != n_genes:
raise ValueError("predictor is of wrong dimension")
predictor = np.broadcast_to(predictor[:, np.newaxis], (n_genes, n_samples)).copy()
elif predictor.ndim == 2:
if predictor.shape[0] != n_genes:
raise ValueError("predictor is of wrong dimension")
if predictor.shape[1] == 1:
predictor = np.broadcast_to(predictor, (n_genes, n_samples)).copy()
elif predictor.shape[1] != n_samples:
raise ValueError("predictor is of wrong dimension")
else:
raise ValueError("predictor is of wrong dimension")
if np.any(np.isnan(predictor)):
y_has_na = np.any(np.isnan(y))
if y_has_na:
if np.any(np.isnan(predictor[~np.isnan(y)])):
raise ValueError("All observed y values must have non-NA predictors")
else:
raise ValueError("All observed y values must have non-NA predictors")
# Parse design (formula strings via patsy + adata.obs when AnnData).
sample_data = original_input.obs if _is_anndata(original_input) else None
design, _ = _parse_design(design, data=sample_data, n_samples=n_samples)
# Check for conflicting weight specifications
use_sample_weights = sample_weights or var_design is not None or var_group is not None
if prior_weights is not None and use_sample_weights:
raise ValueError("Cannot specify prior_weights and estimate sample weights")
use_block = block is not None
# Initial fit
fit = lm_fit(y, design, weights=prior_weights)
# Compute fitted values
if fit["rank"] < design.shape[1]:
pivot = fit.get("pivot", np.arange(design.shape[1]))
j = pivot[: fit["rank"]]
fitted_values = fit["coefficients"][:, j] @ design[:, j].T
else:
fitted_values = fit["coefficients"] @ design.T
# Prepare for trend fitting
sx = A
sy = np.sqrt(fit["sigma"])
mu = fitted_values
# Optionally combine ave log intensity with precision predictor
sxc = None
if predictor is not None:
sxc = np.nanmean(predictor, axis=1)
vartrend_design = np.column_stack([np.ones(n_genes), sx, sxc])
beta, *_ = linalg.lstsq(vartrend_design, sy)
sx = vartrend_design @ beta
mu = beta[0] + beta[1] * mu + beta[2] * predictor
# Choose span
if span is None:
if legacy_span:
span = choose_lowess_span(n_genes, small_n=10, min_span=0.3, power=0.5)
else:
span = choose_lowess_span(n_genes, small_n=50, min_span=0.3, power=1 / 3)
# Fit LOWESS trend
x_range = sx.max() - sx.min()
lowess_result = sm_lowess(sy, sx, frac=span, it=3, delta=0.01 * x_range, return_sorted=True)
x_sorted = lowess_result[:, 0]
y_sorted = lowess_result[:, 1]
# Create interpolating function
f = interpolate.interp1d(
x_sorted,
y_sorted,
kind="linear",
bounds_error=False,
fill_value=(y_sorted[0], y_sorted[-1]),
)
# Compute vooma weights
w = 1 / f(mu) ** 4
# Combine with prior weights if provided
if prior_weights is not None:
# Reshape prior_weights to (n_genes, n_samples) via asMatrixWeights
# before multiplying so probe-weight / array-weight / scalar shapes
# all behave like R's vooma path.
from .classes import as_matrix_weights
weights = w * as_matrix_weights(prior_weights, (n_genes, n_samples))
else:
weights = w
# Estimate sample weights if requested
sw = None
if use_sample_weights:
sw = array_weights(
{"E": y, "weights": weights},
design=design,
var_design=var_design,
var_group=var_group,
prior_n=prior_n,
)
if use_block:
# Apply sample weights to observation weights
weights = weights * sw[np.newaxis, :]
# Estimate block correlation if requested
correlation = None
if use_block:
dc = duplicate_correlation(y, design, block=block, weights=weights)
correlation = dc["consensus_correlation"]
if np.isnan(correlation):
correlation = 0.0
# Second iteration if block or sample weights requested
if use_block or use_sample_weights:
# Reset weights for refit
if use_sample_weights:
weights = np.broadcast_to(sw, (n_genes, n_samples)).copy()
else:
weights = prior_weights
# Refit with correlation
fit = lm_fit(y, design, block=block, correlation=correlation, weights=weights)
# Recompute fitted values
if fit["rank"] < design.shape[1]:
pivot = fit.get("pivot", np.arange(design.shape[1]))
j = pivot[: fit["rank"]]
fitted_values = fit["coefficients"][:, j] @ design[:, j].T
else:
fitted_values = fit["coefficients"] @ design.T
# Refit LOWESS trend
sx = A
sy = np.sqrt(fit["sigma"])
mu = fitted_values
# Re-apply predictor combination using the cached sxc
if predictor is not None:
vartrend_design = np.column_stack([np.ones(n_genes), sx, sxc])
beta, *_ = linalg.lstsq(vartrend_design, sy)
sx = vartrend_design @ beta
mu = beta[0] + beta[1] * mu + beta[2] * predictor
x_range = sx.max() - sx.min()
lowess_result = sm_lowess(sy, sx, frac=span, it=3, delta=0.01 * x_range, return_sorted=True)
x_sorted = lowess_result[:, 0]
y_sorted = lowess_result[:, 1]
f = interpolate.interp1d(
x_sorted,
y_sorted,
kind="linear",
bounds_error=False,
fill_value=(y_sorted[0], y_sorted[-1]),
)
# Recompute vooma weights
w = 1 / f(mu) ** 4
# Combine with prior weights
if prior_weights is not None:
# Reshape prior_weights to (n_genes, n_samples) via
# asMatrixWeights before multiplying so probe-weight /
# array-weight / scalar shapes all behave like R's vooma.
from .classes import as_matrix_weights
weights = w * as_matrix_weights(prior_weights, (n_genes, n_samples))
else:
weights = w
# Re-estimate sample weights
if use_sample_weights:
sw = array_weights(
{"E": y, "weights": weights},
design=design,
var_design=var_design,
var_group=var_group,
prior_n=prior_n,
)
weights = weights * sw[np.newaxis, :]
# Re-estimate block correlation
if use_block:
dc = duplicate_correlation(y, design, block=block, weights=weights)
correlation = dc["consensus_correlation"]
if np.isnan(correlation):
correlation = 0.0
# Final fit
fit = lm_fit(y, design, block=block, correlation=correlation, weights=weights)
# Add span to output
fit["span"] = span
# Add sample weights to targets
if use_sample_weights:
fit["targets"] = {"sample_weights": sw}
# Render trend plot if requested
if plot:
_draw_voom_trend(
sx,
sy,
x_sorted,
y_sorted,
xlab=("Combined predictor" if predictor is not None else "Average log2 expression"),
ylab="Sqrt( standard deviation )",
title=(
"vooma variance trend" if predictor is not None else "vooma mean-variance trend"
),
)
# Add plot data if requested
if save_plot:
fit["voom_xy"] = {
"x": sx,
"y": sy,
"xlab": "Average log-expression",
"ylab": "Sqrt( standard deviation )",
}
fit["voom_line"] = {
"x": x_sorted,
"y": y_sorted,
}
# Add EList if requested
if keep_elist:
fit["EList"] = {
"E": y,
"weights": weights,
}
# Polymorphic output dispatch. AnnData input splits the bundled
# output across (layer, fit-uns, vooma-uns); ndarray / EList / dict
# callers get the full single-dict return as today.
if _is_anndata(original_input):
adata = original_input
# Observation weights layer (limma (n_genes, n_samples) -> AnnData
# (n_samples, n_genes) via as_matrix_weights normalisation + .T).
W = as_matrix_weights(weights, (n_genes, n_samples))
adata.layers[weights_layer] = W.T
# Fit slots under the lm_fit uns key (plain dict for h5ad compat).
bundled = {"EList", "voom_xy", "voom_line", "targets", "span"}
fit_slots = {k: v for k, v in fit.items() if k not in bundled}
adata.uns[key] = dict(fit_slots)
# Ancillary metadata under the vooma uns key. Mirrors what
# vooma() writes, plus sample_weights when estimated.
ancillary = {}
if fit.get("span") is not None:
ancillary["span"] = fit["span"]
if use_sample_weights and isinstance(fit.get("targets"), dict):
ancillary.update(fit["targets"]) # {"sample_weights": sw}
if save_plot:
if "voom_xy" in fit:
ancillary["voom_xy"] = fit["voom_xy"]
if "voom_line" in fit:
ancillary["voom_line"] = fit["voom_line"]
if ancillary:
adata.uns[voom_key] = ancillary
return None
return fit
def vooma_by_group(
y,
group,
design=None,
block=None,
correlation=None,
span=None,
legacy_span: bool = False,
plot: bool = False,
*,
out_layer: str = "vooma_E",
weights_layer: str = "vooma_weights",
uns_key: str = "vooma",
layer: str | None = None,
):
"""
Vooma with group-specific mean-variance trends.
Port of R limma's ``voomaByGroup`` (``vooma.R``). Fits one vooma
trend per ``group`` level and stitches the observation weights
back together. Plotting is not emitted; ``plot=True`` is accepted
for API compatibility but ignored.
Output
------
Polymorphic, matching :func:`vooma`:
- AnnData in: writes ``adata.layers[out_layer]`` (copy of E),
``adata.layers[weights_layer]`` (computed weights), and
``adata.uns[uns_key]`` (design + group). Returns ``None``.
- EList in: returns a new ``EList`` with updated slots.
- ndarray / dict in: returns a dict with ``E``, ``weights``,
``design``, ``group``.
The default ``uns_key="vooma"`` means ``lm_fit(adata, layer="vooma_E")``
picks up the design via the usual :func:`get_eawp` fallback.
"""
import pandas as pd
from .classes import get_eawp, put_eawp
original_input = y
eawp = get_eawp(y, layer=layer)
E = np.asarray(eawp["exprs"], dtype=np.float64)
if design is None:
design = eawp.get("design")
ngenes, narrays = E.shape
# plot=True is accepted for R-signature compatibility but pylimma
# does not yet emit the per-group mean-variance figure. Warn so the
# caller knows their request was silently ignored (R's voomaByGroup
# at vooma.R:116-118 would draw it).
if plot:
warnings.warn(
"vooma_by_group(plot=True) is not implemented in pylimma; "
"no plot will be drawn. Pass plot=False to silence this "
"warning.",
UserWarning,
)
group_arr = np.asarray(pd.Categorical(group))
levels = list(pd.Categorical(group).categories)
ngroups = len(levels)
if group_arr.size != narrays:
raise ValueError("length(group) must equal ncol(y)")
if design is None:
# ~0 + group one-hot
design = np.zeros((narrays, ngroups))
for j, lev in enumerate(levels):
design[group_arr == lev, j] = 1.0
design = np.asarray(design, dtype=np.float64)
weights = np.empty_like(E)
for lev in levels:
mask = group_arr == lev
if mask.sum() < 2:
# Singleton level: fall back to a global vooma fit.
v_all = vooma(
{"E": E},
design=design,
block=block,
correlation=correlation,
span=span,
legacy_span=legacy_span,
plot=False,
)
weights[:, mask] = v_all["weights"][:, mask]
continue
sub = {"E": E[:, mask]}
v = vooma(
sub,
design=np.ones((int(mask.sum()), 1)),
span=span,
legacy_span=legacy_span,
plot=False,
)
weights[:, mask] = v["weights"]
return put_eawp(
{
"E": E,
"weights": weights,
"design": design,
"group": group_arr,
},
original_input,
out_layer=out_layer,
weights_layer=weights_layer,
uns_key=uns_key,
)