# SPDX-License-Identifier: GPL-3.0-or-later
#
# This module is a Python port of code from R limma. Original R copyrights:
# chooseLowessSpan.R Copyright (C) 2020-2024 Gordon Smyth
# weightedLowess.R (loess_fit port)
# Copyright (C) 2014-2020 Aaron Lun
# qqt.R (qqt) Copyright (C) 2002 Gordon Smyth
# qqt.R (qqf) Copyright (C) 2012 Belinda Phipson
# ebayes.R (trigamma_inverse helper)
# Copyright (C) 2002-2004 Gordon Smyth
# zscore.R Copyright (C) 2003-2020 Gordon Smyth
# zscoreHyper.R Copyright (C) 2012 Gordon Smyth
# tricubeMovingAverage.R Copyright (C) 2014-2015 Gordon Smyth,
# Yifang Hu
# convest.R Copyright (C) 2004-2020 Egil Ferkingstad,
# Mette Langaas,
# Gordon Smyth,
# Marcus Davy
# propTrueNull.R Copyright (C) 2012 Belinda Phipson,
# Gordon Smyth
# detectionPValues.R Copyright (C) 2016 Gordon Smyth
# logsumexp.R (logcosh, logsumexp)
# Copyright (C) 2007-2018 Gordon Smyth
# bwss.R (bwss, bwss_matrix) Copyright (C) 2002 Gordon Smyth
# utility.R (is_numeric, block_diag)
# Copyright (C) 2003-2004 Gordon Smyth
# combine.R (make_unique) Copyright (C) 2003-2016 Gordon Smyth
#
# logmdigamma() is a port of the same-named function from the R statmod
# package (not from limma):
# statmod::logmdigamma Copyright (C) Gordon Smyth, Lizhong Chen;
# GPL-2 | GPL-3
# Python port: Copyright (C) 2026 John Mulvey
"""
Utility functions for pylimma.
Internal helper functions used throughout the package.
"""
from __future__ import annotations
import sys
import numpy as np
import pandas as pd
from scipy import stats as _scipy_stats
from scipy.special import digamma, polygamma
def trigamma_inverse(x: np.ndarray | float) -> np.ndarray | float:
"""
Solve trigamma(y) = x for y.
Uses Newton iteration on the inverse of trigamma. Port of R limma's
trigammaInverse function (Gordon Smyth, 2002-2004).
Parameters
----------
x : array_like
Values at which to evaluate the inverse trigamma function.
Must be positive for valid results.
Returns
-------
ndarray or float
The value y such that trigamma(y) = x.
Notes
-----
- Returns NaN for negative inputs (with warning)
- For very large x (> 1e7), uses asymptotic approximation 1/sqrt(x)
- For very small x (< 1e-6), uses asymptotic approximation 1/x
"""
x = np.asarray(x)
scalar_input = x.ndim == 0
x = np.atleast_1d(x).astype(np.float64)
y = np.empty_like(x)
# Handle special cases (order matters - check sequentially)
na_mask = np.isnan(x)
neg_mask = ~na_mask & (x < 0)
large_mask = ~na_mask & ~neg_mask & (x > 1e7)
small_mask = ~na_mask & ~neg_mask & (x < 1e-6)
normal_mask = ~(na_mask | neg_mask | large_mask | small_mask)
# Propagate NaN
y[na_mask] = np.nan
# Negative values -> NaN with warning
if np.any(neg_mask):
import warnings
warnings.warn("NaNs produced", RuntimeWarning)
y[neg_mask] = np.nan
# Asymptotic approximations
y[large_mask] = 1.0 / np.sqrt(x[large_mask])
y[small_mask] = 1.0 / x[small_mask]
# Newton iteration for normal range
if np.any(normal_mask):
x_norm = x[normal_mask]
# Initial guess: 1/trigamma(y) is approximately y - 0.5 for moderate y
y_iter = 0.5 + 1.0 / x_norm
for _ in range(50):
tri = polygamma(1, y_iter) # trigamma
psigamma2 = polygamma(2, y_iter) # tetragamma
dif = tri * (1.0 - tri / x_norm) / psigamma2
y_iter = y_iter + dif
if np.max(-dif / y_iter) < 1e-8:
break
else:
import warnings
warnings.warn("Iteration limit exceeded in trigamma_inverse")
y[normal_mask] = y_iter
if scalar_input:
return float(y[0])
return y
def logmdigamma(x: np.ndarray | float) -> np.ndarray | float:
"""
Compute log(x) - digamma(x).
This function is used in the moment estimation of scaled F-distributions.
Imported from statmod in R limma.
Parameters
----------
x : array_like
Input values. Should be positive.
Returns
-------
ndarray or float
log(x) - digamma(x)
"""
x = np.asarray(x)
return np.log(x) - digamma(x)
[docs]
def qqt(
y: np.ndarray,
df: float = np.inf,
plot_it: bool = True,
**kwargs,
) -> dict:
"""
Student's t probability plot (Q-Q plot).
Produces a Q-Q plot comparing sample quantiles against theoretical
quantiles from a t-distribution.
Parameters
----------
y : array_like
Sample values (e.g., t-statistics).
df : float, default np.inf
Degrees of freedom for the t-distribution.
Use np.inf for a normal distribution.
plot_it : bool, default True
If True, produce the Q-Q plot. Requires matplotlib.
**kwargs
Additional arguments passed to matplotlib's plot function.
Returns
-------
dict
x : ndarray
Theoretical quantiles.
y : ndarray
Sample quantiles (sorted input values).
Examples
--------
>>> fit = e_bayes(lm_fit(expr, design))
>>> qqt(fit["t"][:, 0], df=fit["df_total"][0])
"""
from scipy import stats
y = np.asarray(y)
y = y[~np.isnan(y)]
n = len(y)
if n == 0:
raise ValueError("y is empty or all NaN")
# Probability points
p = (np.arange(1, n + 1) - 0.5) / n
# Theoretical quantiles
x = stats.t.ppf(p, df)[np.argsort(np.argsort(y))]
if plot_it:
try:
import matplotlib.pyplot as plt
plot_kwargs = {
"marker": "o",
"linestyle": "none",
"markersize": 3,
}
plot_kwargs.update(kwargs)
fig, ax = plt.subplots()
ax.plot(x, y, **plot_kwargs)
# Add reference line
ylim = ax.get_ylim()
xlim = ax.get_xlim()
min_val = max(xlim[0], ylim[0])
max_val = min(xlim[1], ylim[1])
ax.plot([min_val, max_val], [min_val, max_val], "r--", alpha=0.7)
ax.set_xlabel("Theoretical Quantiles")
ax.set_ylabel("Sample Quantiles")
title = "Student's t Q-Q Plot" if np.isfinite(df) else "Normal Q-Q Plot"
ax.set_title(title)
plt.tight_layout()
except ImportError:
import warnings
warnings.warn("matplotlib not available for plotting")
return {"x": x, "y": y}
def qqf(
y: np.ndarray,
df1: float,
df2: float,
plot_it: bool = True,
**kwargs,
) -> dict:
"""
F-distribution probability plot (Q-Q plot).
Produces a Q-Q plot comparing sample quantiles against theoretical
quantiles from an F-distribution.
Parameters
----------
y : array_like
Sample values (e.g., F-statistics or variance ratios).
df1 : float
Numerator degrees of freedom.
df2 : float
Denominator degrees of freedom.
plot_it : bool, default True
If True, produce the Q-Q plot. Requires matplotlib.
**kwargs
Additional arguments passed to matplotlib's plot function.
Returns
-------
dict
x : ndarray
Theoretical quantiles.
y : ndarray
Sample quantiles (sorted input values).
"""
from scipy import stats
y = np.asarray(y)
y = y[~np.isnan(y)]
n = len(y)
if n == 0:
raise ValueError("y is empty or all NaN")
# Probability points
p = (np.arange(1, n + 1) - 0.5) / n
# Theoretical quantiles
x = stats.f.ppf(p, df1, df2)[np.argsort(np.argsort(y))]
if plot_it:
try:
import matplotlib.pyplot as plt
plot_kwargs = {
"marker": "o",
"linestyle": "none",
"markersize": 3,
}
plot_kwargs.update(kwargs)
fig, ax = plt.subplots()
ax.plot(x, y, **plot_kwargs)
# Add reference line
ylim = ax.get_ylim()
xlim = ax.get_xlim()
min_val = max(xlim[0], ylim[0])
max_val = min(xlim[1], ylim[1])
ax.plot([min_val, max_val], [min_val, max_val], "r--", alpha=0.7)
ax.set_xlabel("Theoretical Quantiles")
ax.set_ylabel("Sample Quantiles")
ax.set_title("F Distribution Q-Q Plot")
plt.tight_layout()
except ImportError:
import warnings
warnings.warn("matplotlib not available for plotting")
return {"x": x, "y": y}
def choose_lowess_span(
n: int = 1000,
small_n: int = 50,
min_span: float = 0.3,
power: float = 1 / 3,
) -> float:
"""
Choose optimal span for LOWESS smoothing of variance trends.
Larger spans are used for small datasets and smaller spans for larger
datasets.
Parameters
----------
n : int, default 1000
Number of data points.
small_n : int, default 50
Threshold below which maximum smoothing is used.
min_span : float, default 0.3
Minimum span for large datasets.
power : float, default 1/3
Power for scaling between small and large datasets.
Returns
-------
float
Recommended LOWESS span parameter.
"""
return min(min_span + (1 - min_span) * (small_n / n) ** power, 1.0)
def _find_span_window(
idx: int,
x_sorted: np.ndarray,
weights_sorted: np.ndarray,
span_weight: float,
n: int,
) -> tuple:
"""
Find the span window for a given point using cumulative weights.
Extends left and right from idx until cumulative weight reaches span_weight.
Returns (left, right, max_dist) where max_dist is the maximum distance
to either boundary.
"""
left = idx
right = idx
cur_weight = weights_sorted[idx]
max_dist = 0.0
at_start = left == 0
at_end = right == n - 1
while cur_weight < span_weight and (not at_start or not at_end):
if at_end:
# Can only extend left
left -= 1
cur_weight += weights_sorted[left]
if left == 0:
at_start = True
ldist = x_sorted[idx] - x_sorted[left]
max_dist = max(max_dist, ldist)
elif at_start:
# Can only extend right
right += 1
cur_weight += weights_sorted[right]
if right == n - 1:
at_end = True
rdist = x_sorted[right] - x_sorted[idx]
max_dist = max(max_dist, rdist)
else:
# Extend in direction of closer point
ldist = x_sorted[idx] - x_sorted[left - 1]
rdist = x_sorted[right + 1] - x_sorted[idx]
if ldist < rdist:
left -= 1
cur_weight += weights_sorted[left]
if left == 0:
at_start = True
max_dist = max(max_dist, ldist)
else:
right += 1
cur_weight += weights_sorted[right]
if right == n - 1:
at_end = True
max_dist = max(max_dist, rdist)
# Extend to include tied x values
while left > 0 and x_sorted[left] == x_sorted[left - 1]:
left -= 1
while right < n - 1 and x_sorted[right] == x_sorted[right + 1]:
right += 1
return left, right, max_dist
def _weighted_local_regression(
x_fit: float,
x_window: np.ndarray,
y_window: np.ndarray,
obs_weights: np.ndarray,
rob_weights: np.ndarray,
max_dist: float,
) -> float:
"""
Fit weighted local linear regression at a single point.
Combined weight = tricube(distance) * observation_weight * robustness_weight
"""
threshold = 1e-7
# If max_dist is tiny, return weighted mean
if max_dist < threshold:
w = obs_weights * rob_weights
total_w = np.sum(w)
if total_w == 0:
return 0.0
return np.sum(y_window * w) / total_w
# Tricube kernel weights: (1 - |u|^3)^3
u = np.abs(x_window - x_fit) / max_dist
kernel_w = np.where(u < 1, (1 - u**3) ** 3, 0.0)
# Combined weights
w = kernel_w * obs_weights * rob_weights
total_w = np.sum(w)
if total_w == 0:
return 0.0
# Weighted means
x_mean = np.sum(w * x_window) / total_w
y_mean = np.sum(w * y_window) / total_w
# Weighted variance and covariance for local linear fit
x_centered = x_window - x_mean
var = np.sum(w * x_centered**2)
covar = np.sum(w * x_centered * (y_window - y_mean))
# If variance is tiny, return weighted mean
if var < threshold:
return y_mean
# Local linear fit: y = slope * x + intercept
slope = covar / var
return slope * x_fit + y_mean - slope * x_mean
def _weighted_median_abs_deviation(
residuals: np.ndarray,
weights: np.ndarray,
) -> float:
"""Compute weighted median absolute deviation scaled by 6."""
order = np.argsort(np.abs(residuals))
sorted_resid = np.abs(residuals[order])
sorted_weights = weights[order]
total_weight = np.sum(sorted_weights)
half_weight = total_weight / 2.0
cumw = 0.0
for i in range(len(residuals)):
cumw += sorted_weights[i]
if cumw == half_weight and i < len(residuals) - 1:
return 3.0 * (sorted_resid[i] + sorted_resid[i + 1])
elif cumw > half_weight:
return 6.0 * sorted_resid[i]
return 6.0 * sorted_resid[-1]
def loess_fit(
y: np.ndarray,
x: np.ndarray,
weights: np.ndarray | None = None,
span: float = 0.3,
iterations: int = 4,
min_weight: float = 1e-5,
max_weight: float = 1e5,
) -> dict:
"""
Weighted LOWESS fit for univariate x and y.
Implements weighted local regression matching R limma's weightedLowess.
Observation weights affect both span window selection (which neighbours
to include) and the local regression fitting.
Parameters
----------
y : array_like
Response values.
x : array_like
Predictor values.
weights : array_like, optional
Observation weights. Higher weights give more influence to points
and effectively make them "count more" when determining the span
neighbourhood.
span : float, default 0.3
Smoothing parameter (fraction of total weight used for each fit).
iterations : int, default 4
Number of robustifying iterations (1 = no robustness iterations).
min_weight : float, default 1e-5
Minimum weight value.
max_weight : float, default 1e5
Maximum weight value.
Returns
-------
dict
fitted : ndarray
Fitted values.
residuals : ndarray
Residuals (y - fitted).
"""
y = np.asarray(y, dtype=np.float64)
x = np.asarray(x, dtype=np.float64)
n = len(y)
if len(x) != n:
raise ValueError("x and y have different lengths")
# Initialize output
fitted = np.full(n, np.nan)
residuals = np.full(n, np.nan)
# Find valid observations
obs = np.isfinite(y) & np.isfinite(x)
if not np.any(obs):
return {"fitted": fitted, "residuals": residuals}
x_obs = x[obs]
y_obs = y[obs]
n_obs = int(np.sum(obs))
# Check span
if span < 1 / n_obs:
fitted[obs] = y_obs
residuals[obs] = 0
return {"fitted": fitted, "residuals": residuals}
# Handle weights
if weights is not None:
weights = np.asarray(weights, dtype=np.float64)
if len(weights) != n:
raise ValueError("y and weights have different lengths")
w_obs = weights[obs].copy()
w_obs = np.nan_to_num(w_obs, nan=0)
w_obs = np.clip(w_obs, min_weight, max_weight)
else:
w_obs = np.ones(n_obs, dtype=np.float64)
# Sort by x (stable sort to match R behaviour)
order = np.argsort(x_obs, kind="mergesort")
x_sorted = x_obs[order]
y_sorted = y_obs[order]
w_sorted = w_obs[order]
# Compute span weight threshold
total_weight = np.sum(w_sorted)
span_weight = total_weight * span
# Precompute span windows for all points
windows = [_find_span_window(i, x_sorted, w_sorted, span_weight, n_obs) for i in range(n_obs)]
# Initialize robustness weights
rob_weights = np.ones(n_obs, dtype=np.float64)
fitted_sorted = np.zeros(n_obs, dtype=np.float64)
# Iterative fitting with robustness weights
threshold = 1e-7
for _it in range(iterations):
# Fit at each point
for i in range(n_obs):
left, right, max_dist = windows[i]
fitted_sorted[i] = _weighted_local_regression(
x_sorted[i],
x_sorted[left : right + 1],
y_sorted[left : right + 1],
w_sorted[left : right + 1],
rob_weights[left : right + 1],
max_dist,
)
# Last iteration doesn't need robustness weight update
if _it == iterations - 1:
break
# Compute residuals and update robustness weights
resid = y_sorted - fitted_sorted
abs_resid = np.abs(resid)
resid_scale = np.mean(abs_resid)
# Weighted median absolute deviation
cmad = _weighted_median_abs_deviation(resid, w_sorted)
# Check convergence
if cmad <= threshold * resid_scale:
break
# Update robustness weights using bisquare kernel
u = abs_resid / cmad
rob_weights = np.where(u < 1, (1 - u**2) ** 2, 0.0)
# Map back to original order
inv_order = np.argsort(order)
fitted_obs = fitted_sorted[inv_order]
fitted[obs] = fitted_obs
residuals[obs] = y_obs - fitted_obs
return {"fitted": fitted, "residuals": residuals}
[docs]
def weighted_lowess(
x: np.ndarray,
y: np.ndarray,
weights: np.ndarray | None = None,
delta: float | None = None,
npts: int = 200,
span: float = 0.3,
iterations: int = 4,
output_style: str = "loess",
) -> dict:
"""
Weighted LOWESS smoother - R-compatible entry point.
Thin wrapper around :func:`loess_fit` that mirrors R limma's
``weightedLowess(x, y, ...)`` argument order and parameter names.
Use this when porting R code that calls ``weightedLowess``
directly. New Python code should prefer :func:`loess_fit`.
Parameters
----------
x, y : array_like
Predictor and response (R's argument order - note that
:func:`loess_fit` takes ``(y, x)``).
weights : array_like, optional
Per-observation weights.
delta : float, optional
Clustering tolerance for the anchor-point-based smoother.
Ignored by the current implementation, which uses the
exact (non-clustered) weighted LOWESS algorithm. A warning is
emitted if supplied.
npts : int, default 200
Number of anchor points. Ignored by the current
implementation for the same reason as ``delta``. A warning is
emitted if the caller supplies a non-default value.
span, iterations :
See :func:`loess_fit`.
output_style : {"loess", "lowess"}, default "loess"
- ``"loess"``: return ``{"fitted", "residuals", "weights",
"delta"}`` in the caller's original point order (matches
R's ``loess()`` / ``loessFit()`` output shape and
:func:`loess_fit`).
- ``"lowess"``: return ``{"x", "y", "delta"}`` with both
arrays sorted by ``x`` ascending (matches R's ``lowess()``
output shape).
Returns
-------
dict
Schema depends on ``output_style`` - see above.
"""
import warnings as _warnings
if delta is not None:
_warnings.warn(
"weighted_lowess: 'delta' is ignored by pylimma's exact weighted LOWESS implementation",
stacklevel=2,
)
if npts != 200:
_warnings.warn(
"weighted_lowess: 'npts' is ignored by pylimma's exact "
"weighted LOWESS implementation (no anchor-point "
"clustering is performed)",
stacklevel=2,
)
output_style = output_style.lower()
if output_style not in ("loess", "lowess"):
raise ValueError(f"output_style must be 'loess' or 'lowess', got {output_style!r}")
fit = loess_fit(y, x, weights=weights, span=span, iterations=iterations)
if output_style == "loess":
fit["delta"] = delta
if weights is not None:
fit["weights"] = np.asarray(weights, dtype=np.float64)
return fit
# "lowess" output: order-by-x, drop residuals, expose {x, y, delta}.
x_arr = np.asarray(x, dtype=np.float64)
order = np.argsort(x_arr, kind="stable")
return {
"x": x_arr[order],
"y": np.asarray(fit["fitted"], dtype=np.float64)[order],
"delta": delta,
}
def p_adjust(p: np.ndarray, method: str = "BH") -> np.ndarray:
"""
Adjust p-values for multiple testing.
Wrapper around statsmodels multipletests for convenience.
Parameters
----------
p : array_like
Raw p-values.
method : str, default "BH"
Correction method. Options: "BH" (Benjamini-Hochberg), "BY"
(Benjamini-Yekutieli), "holm", "hochberg", "hommel",
"bonferroni", "none".
Returns
-------
ndarray
Adjusted p-values.
"""
from statsmodels.stats.multitest import multipletests
p = np.asarray(p, dtype=np.float64)
m = method.lower()
if m == "none":
return p.copy()
valid_methods = {"bh", "fdr", "by", "bonferroni", "holm", "hommel", "hochberg"}
if m not in valid_methods:
raise ValueError(
f"method '{method}' not recognized. Valid methods: "
"'BH', 'BY', 'holm', 'hochberg', 'hommel', 'bonferroni', 'none'"
)
valid = ~np.isnan(p)
if not np.any(valid):
return p.copy()
adjusted = np.full_like(p, np.nan)
pv = p[valid]
if m == "hochberg":
# Port of R's p.adjust(method="hochberg"): step-up procedure.
# R code: pmin(1, cummin((n - i + 1L) * p[o]))[ro]
# where o = order(p, decreasing=TRUE), i = n:1.
n = pv.size
o = np.argsort(-pv, kind="stable")
ro = np.argsort(o, kind="stable")
multipliers = np.arange(1, n + 1, dtype=np.float64)
adj_sorted = np.minimum.accumulate(multipliers * pv[o])
adj_valid = np.minimum(adj_sorted[ro], 1.0)
else:
method_map = {
"bh": "fdr_bh",
"fdr": "fdr_bh",
"by": "fdr_by",
"bonferroni": "bonferroni",
"holm": "holm",
"hommel": "hommel",
}
_, adj_valid, _, _ = multipletests(pv, method=method_map[m])
adjusted[valid] = adj_valid
return adjusted
# ---------------------------------------------------------------------------
# zscore family (port of R limma's zscore, zscoreGamma, zscoreHyper)
# ---------------------------------------------------------------------------
_ZSCORE_DISTRIBUTIONS = {
"norm": _scipy_stats.norm,
"t": _scipy_stats.t,
"f": _scipy_stats.f,
"chisq": _scipy_stats.chi2,
"beta": _scipy_stats.beta,
"exp": _scipy_stats.expon,
"binom": _scipy_stats.binom,
"pois": _scipy_stats.poisson,
}
def zscore(
q: np.ndarray | float,
distribution: str,
**dist_kwargs,
) -> np.ndarray:
"""
Z-score equivalents for deviates from a specified distribution.
Port of R limma's ``zscore``. Computes the z-score of a value drawn
from ``distribution`` by mapping the corresponding tail probability
onto a standard normal quantile, preserving log-scale accuracy in
the tails.
Parameters
----------
q : array_like
Deviates whose z-score equivalents are wanted.
distribution : str
Distribution name. R's naming convention (without the leading
``p``) is used: ``"norm"``, ``"t"``, ``"f"``, ``"chisq"``,
``"beta"``, ``"exp"``, ``"binom"``, ``"pois"``. Use the
dedicated :func:`zscore_gamma` and :func:`zscore_hyper`
functions for the gamma and hypergeometric cases.
**dist_kwargs
Distribution parameters passed through to scipy.stats. Names
mirror R's ``p<distribution>`` arguments (e.g. ``df`` for
``"t"``; ``df1`` and ``df2`` for ``"f"``).
Returns
-------
ndarray
Z-scores with the same shape as ``q``.
"""
from scipy.special import ndtri_exp
if distribution == "gamma":
return zscore_gamma(q, **dist_kwargs)
if distribution == "hyper":
return zscore_hyper(q, **dist_kwargs)
if distribution not in _ZSCORE_DISTRIBUTIONS:
raise ValueError(
f"distribution '{distribution}' not recognised. "
"Must be one of 'norm', 't', 'f', 'gamma', 'hyper', "
"'chisq', 'beta', 'exp', 'binom', 'pois'."
)
dist = _ZSCORE_DISTRIBUTIONS[distribution]
q = np.asarray(q, dtype=np.float64)
z = q.astype(np.float64, copy=True)
pupper = dist.logsf(q, **dist_kwargs)
plower = dist.logcdf(q, **dist_kwargs)
up = pupper < plower
if np.any(up):
z[up] = -ndtri_exp(pupper[up])
if np.any(~up):
z[~up] = ndtri_exp(plower[~up])
return z
def zscore_gamma(
q: np.ndarray | float,
shape: np.ndarray | float,
rate: np.ndarray | float = 1.0,
scale: np.ndarray | float | None = None,
) -> np.ndarray:
"""
Z-score equivalents for gamma deviates.
Port of R limma's ``zscoreGamma``.
Parameters
----------
q : array_like
Gamma deviates.
shape : array_like or float
Shape parameter. Recycled to the length of ``q``.
rate : array_like or float, default 1.0
Rate parameter. Used only if ``scale`` is omitted; ``scale``
defaults to ``1 / rate`` to mirror R's argument convention.
scale : array_like or float, optional
Scale parameter (``1 / rate``). When supplied takes precedence
over ``rate``.
Returns
-------
ndarray
Z-score equivalents with the same shape as ``q``.
"""
from scipy.special import ndtri_exp
q = np.asarray(q, dtype=np.float64)
n = q.size
shape = np.broadcast_to(np.asarray(shape, dtype=np.float64), n).copy()
if scale is None:
scale = 1.0 / np.asarray(rate, dtype=np.float64)
scale = np.broadcast_to(np.asarray(scale, dtype=np.float64), n).copy()
z = q.astype(np.float64, copy=True)
q_flat = q.reshape(-1)
z_flat = z.reshape(-1)
up = q_flat > shape * scale
if np.any(up):
logp = _scipy_stats.gamma.logsf(q_flat[up], a=shape[up], scale=scale[up])
z_flat[up] = -ndtri_exp(logp)
if np.any(~up):
logp = _scipy_stats.gamma.logcdf(q_flat[~up], a=shape[~up], scale=scale[~up])
z_flat[~up] = ndtri_exp(logp)
return z
def zscore_hyper(
q: np.ndarray | float,
m: np.ndarray | float,
n: np.ndarray | float,
k: np.ndarray | float,
) -> np.ndarray:
"""
Z-score equivalents for hypergeometric deviates.
Port of R limma's ``zscoreHyper``. Adds a half point-probability
correction to either the upper or lower tail before mapping to a
standard normal quantile, preserving log-scale accuracy.
Parameters
----------
q : array_like
Number of successes observed.
m : array_like or float
Number of white balls in the urn.
n : array_like or float
Number of black balls in the urn.
k : array_like or float
Number of balls drawn.
Returns
-------
ndarray
Z-score equivalents with the same shape as ``q``.
"""
from scipy.special import ndtri_exp
q = np.asarray(q, dtype=np.float64)
z = q.astype(np.float64, copy=True)
M = np.asarray(m, dtype=np.float64) + np.asarray(n, dtype=np.float64)
m_arr = np.asarray(m, dtype=np.float64)
k_arr = np.asarray(k, dtype=np.float64)
with np.errstate(invalid="ignore"):
d = _scipy_stats.hypergeom.logpmf(q, M, m_arr, k_arr) - np.log(2.0)
pupper = _scipy_stats.hypergeom.logsf(q, M, m_arr, k_arr)
plower = _scipy_stats.hypergeom.logcdf(q - 1, M, m_arr, k_arr)
d = np.where(np.isnan(d), -np.inf, d)
pupper = np.where(np.isnan(pupper), -np.inf, pupper)
plower = np.where(np.isnan(plower), -np.inf, plower)
# Add half point probability to upper tail in log-space.
a = np.where(d > pupper, d, pupper)
b = -np.abs(d - pupper)
pmidupper = a + np.log1p(np.exp(b))
inf_a = np.isinf(a)
pmidupper = np.where(inf_a, a, pmidupper)
# Similarly for lower tail.
a = np.where(d > plower, d, plower)
b = -np.abs(d - plower)
pmidlower = a + np.log1p(np.exp(b))
inf_a = np.isinf(a)
pmidlower = np.where(inf_a, a, pmidlower)
up = pmidupper < pmidlower
if np.any(up):
z[up] = -ndtri_exp(pmidupper[up])
if np.any(~up):
z[~up] = ndtri_exp(pmidlower[~up])
return z
# ---------------------------------------------------------------------------
# zscore_t (port of R limma's zscoreT and its dot-prefixed helpers)
# ---------------------------------------------------------------------------
def _zscore_t_quantile(x: np.ndarray, df: np.ndarray) -> np.ndarray:
# qnorm(pt(abs(x), df, lower.tail=FALSE, log.p=TRUE),
# lower.tail=FALSE, log.p=TRUE) * sign(x)
logp = _scipy_stats.t.logsf(np.abs(x), df)
return _scipy_stats.norm.isf(np.exp(logp)) * np.sign(x)
def _zscore_t_wallace(x: np.ndarray, df: np.ndarray) -> np.ndarray:
return (df + 0.125) / (df + 0.375) * np.sqrt(df * np.log1p(x / df * x)) * np.sign(x)
def _zscore_t_bailey(x: np.ndarray, df: np.ndarray) -> np.ndarray:
return (
(df + 0.125)
/ (df + 1.125)
* np.sqrt((df + 19.0 / 12.0) * np.log1p(x / (df + 1.0 / 12.0) * x))
* np.sign(x)
)
def _zscore_t_hill(x: np.ndarray, df: np.ndarray) -> np.ndarray:
A = df - 0.5
B = 48.0 * A * A
z = A * np.log1p(x / df * x)
z = (
((((-0.4 * z - 3.3) * z - 24.0) * z - 85.5) / (0.8 * z * z + 100.0 + B) + z + 3.0) / B + 1.0
) * np.sqrt(z)
return z * np.sign(x)
[docs]
def zscore_t(
x: np.ndarray,
df: np.ndarray | float,
approx: bool = False,
method: str = "bailey",
) -> np.ndarray:
"""
Z-score equivalents of t-statistics.
Port of R limma's ``zscoreT``. When ``approx=True`` the calculation uses
one of three closed-form approximations selected by ``method``
(``"bailey"``, ``"hill"``, ``"wallace"``). When ``approx=False`` the
z-scores are computed via the exact quantile transformation using the
log-scale t-distribution survival function; ``method`` is ignored on
that path.
Parameters
----------
x : array_like
t-statistics (any shape).
df : array_like or float
Degrees of freedom. Broadcasts against ``x``.
approx : bool, default False
If True, use a closed-form approximation.
method : {"bailey", "hill", "wallace"}, default "bailey"
Approximation to use when ``approx=True``. Ignored when
``approx=False``.
Returns
-------
ndarray
Z-score equivalents with the same shape as ``x``.
"""
x = np.asarray(x, dtype=np.float64)
df = np.asarray(df, dtype=np.float64)
df = np.broadcast_to(df, x.shape).astype(np.float64, copy=True)
if approx:
df = np.minimum(df, 1e100)
if method not in ("bailey", "hill", "wallace"):
raise ValueError(
f"method '{method}' not recognized. Must be one of 'bailey', 'hill', 'wallace'."
)
if method == "bailey":
return _zscore_t_bailey(x, df)
if method == "hill":
return _zscore_t_hill(x, df)
return _zscore_t_wallace(x, df)
return _zscore_t_quantile(x, df)
# ---------------------------------------------------------------------------
# tricube_moving_average (port of R limma's tricubeMovingAverage)
# ---------------------------------------------------------------------------
[docs]
def tricube_moving_average(
x: np.ndarray,
span: float = 0.5,
power: int = 3,
) -> np.ndarray:
"""
Moving average filter with tricube weights for a time series.
Port of R limma's ``tricubeMovingAverage``.
Parameters
----------
x : array_like
1-D series.
span : float, default 0.5
Fraction of ``x`` to include in the smoothing window (clamped to
``(0, 1]``; values <= 0 return ``x`` unchanged).
power : int, default 3
Power applied to the tricube weights.
Returns
-------
ndarray
Smoothed series of the same length as ``x``.
"""
import warnings
if hasattr(span, "__len__") and len(span) > 1:
warnings.warn("only first value of span used")
span = span[0]
if span > 1:
span = 1.0
x = np.asarray(x, dtype=np.float64)
if span <= 0:
return x.copy()
if hasattr(power, "__len__") and len(power) > 1:
warnings.warn("only first value of power used")
power = power[0]
if power < 0:
power = 0
n = x.size
width_f = span * n
hwidth = int(width_f) // 2
width = 2 * hwidth + 1
if width > n:
width -= 2
hwidth -= 1
if hwidth <= 0:
return x.copy()
u = np.linspace(-1.0, 1.0, width) * width / (width + 1)
weights = (1 - np.abs(u) ** 3) ** power
weights = weights / np.sum(weights)
# R's filter(..., convolution) centres the kernel on each point and
# uses the kernel as-is (no flipping for symmetric tricube weights).
# Extend the series with hwidth zeros on each side and convolve.
z = np.zeros(hwidth)
padded = np.concatenate([z, x, z])
# np.convolve with mode='valid' produces length (len(padded) - width + 1)
smoothed = np.convolve(padded, weights, mode="valid")
# R indexes (hwidth+1):(n+hwidth) in 1-based -> 0:n in 0-based from
# the valid-mode result (which starts at the position where the kernel
# fits entirely, corresponding to R's output at index hwidth+1).
out = smoothed[:n].copy()
# Rescale boundary values to remove influence of outside zeros
cw = np.cumsum(weights)
# R: x[1:hwidth] / cw[(width-hwidth):(width-1)] (1-based, inclusive)
# 0-based: out[0:hwidth] /= cw[width-hwidth-1:width-1]
out[:hwidth] = out[:hwidth] / cw[width - hwidth - 1 : width - 1]
# R: x[(n-hwidth+1):n] / cw[(width-1):(width-hwidth)] (descending)
# 0-based: out[n-hwidth:n] /= cw[width-2:width-hwidth-2:-1]
# Construct the descending slice via index arithmetic.
idx = np.arange(width - 2, width - hwidth - 2, -1)
out[n - hwidth : n] = out[n - hwidth : n] / cw[idx]
return out
# ---------------------------------------------------------------------------
# convest (port of R limma's convest)
# ---------------------------------------------------------------------------
[docs]
def convest(
p: np.ndarray,
niter: int = 100,
plot: bool = False,
report: bool = False,
tol: float = 1e-6,
file: str = "",
) -> float:
"""
Estimate pi0 using a convex decreasing density estimate.
Port of R limma's ``convest``. The ``file`` argument controls where
the diagnostic report is written when ``report=True``: the default
empty string writes to stdout (matching R's ``file=""`` convention);
any non-empty string is opened as a path.
Parameters
----------
p : array_like
Observed p-values in [0, 1].
niter : int, default 100
Number of iterations of the EM-like algorithm.
plot : bool, default False
If True, draw the estimated density at each iteration.
report : bool, default False
If True, print a per-iteration diagnostic table.
tol : float, default 1e-6
Accuracy of the bisection search for the convex combination.
file : str, default ""
Destination for the report. Empty string -> stdout.
Returns
-------
float
Estimated proportion of true null hypotheses.
"""
p = np.asarray(p, dtype=np.float64)
if p.size == 0:
return float("nan")
if np.any(np.isnan(p)):
raise ValueError("Missing values in p not allowed")
if np.any((p < 0) | (p > 1)):
raise ValueError("All p-values must be between 0 and 1")
k = int(niter)
ny = float(tol)
p = np.sort(p)
m = p.size
p_c = np.ceil(100.0 * p) / 100.0
p_f = np.floor(100.0 * p) / 100.0
t_grid = np.arange(1, 101) / 100.0
x_grid = np.arange(0, 101) / 100.0
f_hat = np.ones(101)
f_hat_p = np.ones(m)
# theta.hat = 0.01 * which.max(apply(..., sum((2*(theta-p)*(p<theta)/theta^2))))
def _argmax_theta(f_hat_p_cur):
# For each theta in t_grid, compute sum((2*(theta-p)*(p<theta)/theta^2)/f_hat_p_cur)
# but the initial theta.hat uses sum without division.
vals = np.empty(100)
for i, theta in enumerate(t_grid):
mask = p < theta
vals[i] = np.sum((2.0 * (theta - p[mask]) / theta**2) / f_hat_p_cur[mask])
return 0.01 * (int(np.argmax(vals)) + 1)
# Initial theta.hat with f_hat_p = 1
theta_hat = _argmax_theta(np.ones_like(p))
f_theta_hat = 2.0 * (theta_hat - x_grid) * (x_grid < theta_hat) / theta_hat**2
f_theta_hat_p = 2.0 * (theta_hat - p) * (p < theta_hat) / theta_hat**2
thetas = []
# Report output stream
report_fh = None
if report:
if file == "":
report_fh = sys.stdout
report_close = False
else:
report_fh = open(file, "w")
report_close = True
report_fh.write("j\tpi0\ttheta.hat\t\tepsilon\tD\n")
pi_0_hat = 1.0
for j in range(1, k + 1):
f_hat_diff = f_hat_p - f_theta_hat_p
if np.sum(f_hat_diff / f_hat_p) > 0:
eps = 0.0
else:
l = 0.0
u = 1.0
while abs(u - l) > ny:
eps = (l + u) / 2.0
# Only include elements where f.hat.p > 0 (R parity)
mask_pos = f_hat_p > 0
denom = (1.0 - eps) * f_hat_p[mask_pos] + eps * f_theta_hat_p[mask_pos]
if np.sum(f_hat_diff[mask_pos] / denom) < 0:
l = eps
else:
u = eps
f_hat = (1.0 - eps) * f_hat + eps * f_theta_hat
pi_0_hat = f_hat[100]
d = np.sum(f_hat_diff / f_hat_p)
if report:
report_fh.write(f"{j}\t{pi_0_hat}\t{theta_hat}\t{eps}\t{d}\n")
# f_hat_p <- 100*(f_hat[100*p_f+1] - f_hat[100*p_c+1])*(p_c-p) + f_hat[100*p_c+1]
idx_f = (100.0 * p_f).astype(int)
idx_c = (100.0 * p_c).astype(int)
f_hat_p = 100.0 * (f_hat[idx_f] - f_hat[idx_c]) * (p_c - p) + f_hat[idx_c]
# Pick new theta.hat
vals = np.empty(100)
for i, theta in enumerate(t_grid):
mask = p < theta
vals[i] = np.sum((2.0 * (theta - p[mask]) / theta**2) / f_hat_p[mask])
theta_hat = 0.01 * (int(np.argmax(vals)) + 1)
f_theta_hat = 2.0 * (theta_hat - x_grid) * (x_grid < theta_hat) / theta_hat**2
f_theta_hat_p = 2.0 * (theta_hat - p) * (p < theta_hat) / theta_hat**2
# Check if the Unif[0,1]-density is the new f_theta_hat
if np.sum(f_theta_hat_p / f_hat_p) < np.sum(1.0 / f_hat_p):
theta_hat = 0.0
f_theta_hat = np.ones(101)
f_theta_hat_p = np.ones(m)
if theta_hat not in thetas:
thetas.append(theta_hat)
thetas.sort()
if plot:
try:
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
ax.plot(x_grid, f_hat)
ax.set_ylim(0, 1.2)
ax.set_title(f"{round(pi_0_hat, 5):.5f}")
theta_arr = np.asarray(thetas)
ax.plot(
theta_arr,
f_hat[(100.0 * theta_arr).astype(int)],
"bo",
)
plt.show(block=False)
except ImportError:
pass
if report and report_close:
report_fh.close()
return float(pi_0_hat)
# ---------------------------------------------------------------------------
# prop_true_null (port of R limma's propTrueNull)
# ---------------------------------------------------------------------------
def _prop_true_null_by_local_fdr(p: np.ndarray) -> float:
n = p.size
# R: i <- n:1L; sort(p, decreasing=TRUE); q <- pmin(n/i * p, 1)
p_sorted = np.sort(p)[::-1]
i = np.arange(n, 0, -1, dtype=np.float64)
q = np.minimum(n / i * p_sorted, 1.0)
n1 = n + 1
return float(np.sum(i * q) / n / n1 * 2.0)
def _prop_true_null_by_mean_p(p: np.ndarray) -> float:
n = p.size
p_sorted = np.sort(p)
i = np.arange(1, n + 1, dtype=np.float64)
q = np.minimum(p_sorted, (i - 0.5) / n)
return float(2.0 * np.mean(q))
def _prop_true_null_from_histogram(p: np.ndarray, nbins: int = 20) -> float:
# bin <- c(-0.1, (1:nbins)/nbins)
edges = np.concatenate([[-0.1], np.arange(1, nbins + 1) / nbins])
# tab <- tabulate(cut(p, bin)) -> counts in each bin
bin_counts = np.zeros(nbins, dtype=np.int64)
# np.histogram uses (left, right] by default (include right edge), matching
# R's cut() default right=TRUE.
counts, _ = np.histogram(p, bins=edges)
bin_counts[: counts.size] = counts
# tail.means <- rev(cumsum(rev(bin.counts))/(1:nbins))
rev_counts = bin_counts[::-1]
cum = np.cumsum(rev_counts)
denom = np.arange(1, nbins + 1, dtype=np.float64)
tail_means = (cum / denom)[::-1]
# index <- which(tail.means >= bin.counts)[1]
mask = tail_means >= bin_counts
idx = int(np.argmax(mask)) # argmax returns first True
# Guard: if mask is all False, argmax returns 0 but mask[0] is False
if not mask[idx]:
return float("nan")
if tail_means[0] == 0:
return float("nan")
return float(tail_means[idx] / tail_means[0])
[docs]
def prop_true_null(
p: np.ndarray,
method: str = "lfdr",
nbins: int = 20,
**convest_kwargs,
) -> float:
"""
Estimate the proportion of null p-values.
Port of R limma's ``propTrueNull``. Dispatches to one of four
estimators.
Parameters
----------
p : array_like
Observed p-values.
method : {"lfdr", "mean", "hist", "convest"}, default "lfdr"
Estimator to use.
nbins : int, default 20
Number of histogram bins when ``method="hist"``.
**convest_kwargs
Extra keyword arguments passed to :func:`convest` when
``method="convest"``.
Returns
-------
float
Estimated proportion of true nulls.
"""
p = np.asarray(p, dtype=np.float64)
if method not in ("lfdr", "mean", "hist", "convest"):
raise ValueError(
f"method '{method}' not recognized. Must be one of 'lfdr', 'mean', 'hist', 'convest'."
)
if method == "lfdr":
return _prop_true_null_by_local_fdr(p)
if method == "mean":
return _prop_true_null_by_mean_p(p)
if method == "hist":
return _prop_true_null_from_histogram(p, nbins=nbins)
return convest(p, **convest_kwargs)
# ---------------------------------------------------------------------------
# detection_p_values (port of R limma's detectionPValues.default)
# ---------------------------------------------------------------------------
[docs]
def detection_p_values(
x: np.ndarray,
status: np.ndarray | list,
negctrl: str = "negative",
) -> np.ndarray:
"""
Detection p-values from negative controls.
Port of R limma's ``detectionPValues.default`` (matrix path). Returns
a matrix the same shape as ``x`` containing per-probe detection
p-values computed from the empirical distribution of the negative
controls within each column.
Parameters
----------
x : array_like
Probe intensity matrix, shape (n_probes, n_arrays).
status : array_like
Per-probe status labels. Probes whose status equals ``negctrl``
are treated as negative controls.
negctrl : str, default "negative"
Label identifying negative controls in ``status``.
Returns
-------
ndarray
Detection p-values with the same shape as ``x``.
"""
x = np.asarray(x, dtype=np.float64)
if x.ndim == 1:
x = x.reshape(-1, 1)
status = np.asarray(status)
if status.size != x.shape[0]:
raise ValueError("length of status must match nrow(x)")
isneg = (status == negctrl).astype(np.int64)
notneg = 1 - isneg
nneg = int(np.sum(isneg))
if nneg == 0:
raise ValueError("No negative controls")
n_probes, n_arrays = x.shape
out = np.empty_like(x)
for j in range(n_arrays):
col = x[:, j]
# R: order(x[,j], isneg, decreasing=TRUE)
# stable sort by -x then by -isneg (because decreasing=TRUE for both)
# Use lexsort: last key is primary.
o1 = np.lexsort((-isneg, -col))
# R: order(x[,j], notneg, decreasing=TRUE)
o2 = np.lexsort((-notneg, -col))
cs1 = np.empty(n_probes, dtype=np.int64)
cs2 = np.empty(n_probes, dtype=np.int64)
cs1[o1] = np.cumsum(isneg[o1])
cs2[o2] = np.cumsum(isneg[o2])
out[:, j] = cs1 + cs2
return out / (2.0 * nneg)
# ============================================================================
# Simple utilities ported from R limma
# ============================================================================
def is_numeric(x) -> bool:
"""
Test whether the argument is numeric.
Port of R limma's ``isNumeric(x)`` (``utility.R``). True for numeric
arrays / scalars, and for DataFrames whose columns are all numeric.
"""
if isinstance(x, pd.DataFrame):
if x.shape[1] == 0:
return False
return bool(x.select_dtypes(include="number").shape[1] == x.shape[1])
try:
arr = np.asarray(x)
except Exception:
return False
return np.issubdtype(arr.dtype, np.number)
def block_diag(*matrices) -> np.ndarray:
"""
Build a block-diagonal matrix from the given inputs.
Port of R limma's ``blockDiag(...)`` (``utility.R``). Delegates to
``scipy.linalg.block_diag`` while matching R's behaviour of raising
when any argument is not 2-D.
"""
from scipy.linalg import block_diag as _scipy_block_diag
mats = []
for m in matrices:
arr = np.asarray(m)
if arr.ndim != 2:
raise ValueError("all arguments must be matrices")
mats.append(arr)
if not mats:
return np.zeros((0, 0))
return _scipy_block_diag(*mats)
def make_unique(x) -> np.ndarray:
"""
Disambiguate a string vector by appending zero-padded indices to any
value that repeats.
Port of R limma's ``makeUnique(x)`` (``combine.R``). If a value
occurs ``n`` times (``n > 1``) each occurrence is replaced with
``<value><k>`` where ``k`` is ``1..n`` left-padded to
``floor(log10(n)) + 1`` digits. Values that occur only once are
unchanged.
"""
x = np.asarray([str(v) for v in x], dtype=object)
# Count occurrences, order by first appearance to match R's table().
unique_vals, counts = np.unique(x, return_counts=True)
for val, n in zip(unique_vals, counts):
if n <= 1:
continue
width = 1 + int(np.floor(np.log10(n)))
positions = np.where(x == val)[0]
for k, pos in enumerate(positions, start=1):
x[pos] = f"{val}{k:0{width}d}"
return x
def logcosh(x) -> np.ndarray:
"""
Compute ``log(cosh(x))`` without floating-point over/underflow.
Port of R limma's ``logcosh`` (``logsumexp.R``).
"""
x = np.asarray(x, dtype=np.float64)
absx = np.abs(x)
y = absx - np.log(2.0)
# Small-argument Taylor
small = absx < 1e-4
y = np.where(small, 0.5 * x**2, y)
# Mid-range direct
mid = (~small) & (absx < 17.0)
if np.any(mid):
y = np.where(mid, np.log(np.cosh(np.where(mid, x, 0.0))), y)
return y
def logsumexp(x, y=None) -> np.ndarray:
"""
Compute ``log(exp(x) + exp(y))`` without floating over/underflow.
Port of R limma's ``logsumexp(x, y)`` (``logsumexp.R``). When only
one positional argument is given it reduces to the standard
``scipy.special.logsumexp`` of an array.
"""
if y is None:
from scipy.special import logsumexp as _lse
return _lse(np.asarray(x, dtype=np.float64))
x = np.asarray(x, dtype=np.float64)
y = np.asarray(y, dtype=np.float64)
x, y = np.broadcast_arrays(x, y)
mi = np.minimum(x, y)
ma = np.maximum(x, y)
result = np.empty_like(ma)
is_nan = np.isnan(ma)
inf_ma = np.isposinf(ma)
inf_mi = np.isneginf(mi)
special = is_nan | inf_ma | inf_mi
# Finite path
finite = ~special
if np.any(finite):
m = (x[finite] + y[finite]) / 2.0
result[finite] = m + logcosh(ma[finite] - m) + np.log(2.0)
# Special handling matching R: Inf in ma -> Inf; -Inf in mi -> ma;
# NaN -> propagate.
result[is_nan] = np.nan
result[inf_ma & ~is_nan] = np.inf
both_inf = inf_mi & ~is_nan & ~inf_ma
if np.any(both_inf):
result[both_inf] = ma[both_inf]
return result
def bwss(x, group) -> dict:
"""
Between- and within-group sums of squares.
Port of R limma's ``bwss(x, group)`` (``bwss.R``). Returns a dict
with ``bss``, ``wss``, ``bdf`` (between-group df = n_groups - 1),
and ``wdf`` (within-group df = sum(n_i - 1)).
"""
x = np.asarray(x, dtype=np.float64)
group = np.asarray(group)
keep = ~(np.isnan(x) | pd.isna(group))
x = x[keep]
group = group[keep]
if x.size == 0:
return {"bss": np.nan, "wss": np.nan, "bdf": np.nan, "wdf": np.nan}
unique, counts = np.unique(group, return_counts=True)
means = np.array([np.mean(x[group == g]) for g in unique])
# R uses ddof=1 sample variance; when n==1 variance is NaN
variances = np.array(
[np.var(x[group == g], ddof=1) if counts[i] > 1 else np.nan for i, g in enumerate(unique)]
)
keep_grp = counts > 0
means = means[keep_grp]
variances = variances[keep_grp]
counts = counts[keep_grp]
total_n = counts.sum()
grand_mean = np.sum(counts * means) / total_n
wss = float(np.nansum((counts - 1) * variances))
bss = float(np.sum(counts * (means - grand_mean) ** 2))
wdf = int(np.sum(counts - 1))
bdf = int(len(means) - 1)
return {"bss": bss, "wss": wss, "bdf": bdf, "wdf": wdf}
def pool_var(var, df=None, multiplier=None, n=None) -> dict:
"""
Pool sample variances allowing for unequal true variances via
Satterthwaite's method.
Port of R limma's ``poolVar(var, df=n-1, multiplier=1/n, n)``
(``poolvar.R``). ``df`` and ``multiplier`` default to ``n-1`` and
``1/n`` respectively when ``n`` is supplied - matching R's default
expressions.
"""
var = np.atleast_1d(np.asarray(var, dtype=np.float64))
if df is None:
if n is None:
raise ValueError("Either df or n must be provided")
df = np.atleast_1d(np.asarray(n, dtype=np.float64)) - 1.0
else:
df = np.atleast_1d(np.asarray(df, dtype=np.float64))
if multiplier is None:
if n is None:
raise ValueError("Either multiplier or n must be provided")
multiplier = 1.0 / np.atleast_1d(np.asarray(n, dtype=np.float64))
else:
multiplier = np.atleast_1d(np.asarray(multiplier, dtype=np.float64))
if np.min(multiplier) < 0:
raise ValueError("Multipliers must be non-negative")
if np.max(multiplier) == 0:
return {"var": 0.0, "df": 0.0, "multiplier": 0.0}
sm = float(np.sum(multiplier))
mnorm = multiplier / sm
pooled_var = float(np.sum(mnorm * var))
denom = float(np.sum(mnorm**2 * var**2 / df))
pooled_df = pooled_var**2 / denom if denom > 0 else float("inf")
return {"var": pooled_var, "df": pooled_df, "multiplier": sm}
def cum_overlap(ol1, ol2) -> dict:
"""
Cumulative-overlap analysis of two ordered ID lists.
Port of R limma's ``cumOverlap(ol1, ol2)`` (``cumOverlap.R``).
Returns a dict with ``n_total``, ``n_min``, ``p_min``,
``n_overlap``, ``id_overlap``, ``p_value``, ``adj_p_value``.
"""
from scipy import stats as _stats
ol1 = list(ol1)
ol2 = list(ol2)
if len(set(ol1)) != len(ol1):
raise ValueError("Duplicate IDs found in ol1")
if len(set(ol2)) != len(ol2):
raise ValueError("Duplicate IDs found in ol2")
set2 = set(ol2)
ol1 = [v for v in ol1 if v in set2]
set1 = set(ol1)
ol2 = [v for v in ol2 if v in set1]
ngenes = len(ol1)
if ngenes == 0:
return {"n_total": 0}
rank_map = {v: i for i, v in enumerate(ol2)}
m = np.array([rank_map[v] for v in ol1], dtype=np.int64)
noverlap = np.empty(ngenes, dtype=np.int64)
for j in range(ngenes):
noverlap[j] = int(np.sum(m[: j + 1] <= j))
i = np.arange(1, ngenes + 1, dtype=np.int64)
# phyper(noverlap - 0.5, m=i, n=ngenes - i, k=i, lower.tail=FALSE)
# = 1 - P(X <= noverlap - 1) under hypergeometric(good=i, bad=ngenes-i, drawn=i).
p = 1.0 - _stats.hypergeom.cdf(np.maximum(noverlap - 1, -1), ngenes, i, i)
p_b = p * i
nmin = int(np.argmin(p_b))
p_b = np.minimum(p_b, 1.0)
id_overlap = [ol1[j] for j in range(nmin + 1) if m[j] <= nmin]
return {
"n_total": ngenes,
"n_min": nmin + 1,
"p_min": float(p_b[nmin]),
"n_overlap": noverlap,
"id_overlap": id_overlap,
"p_value": p,
"adj_p_value": p_b,
}
def propexpr(
x,
neg_x=None,
status=None,
labels: tuple[str, str] = ("negative", "regular"),
) -> np.ndarray:
"""
Estimate the proportion of expressed probes on each array.
Port of R limma's ``propexpr`` (``propexpr.R``). Requires either
explicit negative-control probes ``neg_x`` or a ``status`` vector
that identifies ``labels[0]`` ("negative") and optionally
``labels[1]`` ("regular") probes.
"""
x_mat = np.asarray(x, dtype=np.float64)
if neg_x is None:
if status is None:
raise ValueError(
"Either neg_x or status must be supplied so the "
"negative-control probes can be located."
)
status_arr = np.asarray(status, dtype=object)
sl = np.array([str(s).lower() for s in status_arr])
ineg = np.array([labels[0].lower() in s for s in sl])
if len(labels) > 1:
ireg = np.array([labels[1].lower() in s for s in sl])
else:
ireg = ~ineg
neg_mat = x_mat[ineg, :]
x_mat = x_mat[ireg, :]
else:
neg_mat = np.asarray(neg_x, dtype=np.float64)
narrays = x_mat.shape[1]
pi1 = np.empty(narrays, dtype=np.float64)
for i in range(narrays):
b = neg_mat[:, i]
b = b[~np.isnan(b)]
r = x_mat[:, i]
r = r[~np.isnan(r)]
mu = np.mean(b)
alpha = max(np.mean(r) - mu, 10.0)
b1 = float(np.median(b))
# Exponential CDF on (b1 - b) with rate = 1/alpha.
diffs = b1 - b
p1_i = float(np.mean(1.0 - np.exp(-np.clip(diffs, 0, None) / alpha)))
pb_i = float((np.sum(b < b1) + np.sum(b == b1) / 2.0) / len(b))
p_i = float((np.sum(r < b1) + np.sum(r == b1) / 2.0) / len(r))
denom = pb_i - p1_i
pi1[i] = (pb_i - p_i) / denom if denom != 0 else np.nan
pi1 = np.clip(pi1, 0.0, 1.0)
return pi1
def fit_gamma_intercept(y, offset=0.0, maxit: int = 1000) -> float:
"""
Estimate the intercept of an additive gamma GLM.
Port of R limma's ``fitGammaIntercept`` (``fitGammaIntercept.R``).
Iterative root-find on
``sum(y / (offset + x)) - n = 0``.
"""
y = np.asarray(y, dtype=np.float64)
if np.any(y < 0):
raise ValueError("negative y not permitted")
offset = np.asarray(offset, dtype=np.float64)
if np.any(offset < 0):
raise ValueError("offsets must be positive")
r0, r1 = float(np.min(offset)), float(np.max(offset))
if r0 + 1e-14 > r1:
return float(np.mean(y) - r0)
n = y.size
x = 0.0
for _ in range(maxit):
denom = offset + x
Q = float(np.sum(y / denom))
dQ = float(np.sum(y / denom**2))
if dQ == 0:
break
dif = (Q - n) / dQ
x = x + dif
if abs(dif) < 1e-8:
break
return x
def bwss_matrix(x) -> dict:
"""
Between- and within-column sums of squares of a matrix.
Port of R limma's ``bwss.matrix(x)`` (``bwss.R``).
"""
x = np.asarray(x, dtype=np.float64)
if x.ndim != 2:
raise ValueError("x must be a 2-D matrix")
counts = np.sum(~np.isnan(x), axis=0)
keep = counts > 0
x = x[:, keep]
counts = counts[keep]
if counts.size == 0:
return {"bss": np.nan, "wss": np.nan, "bdf": np.nan, "wdf": np.nan}
means = np.nanmean(x, axis=0)
# Sample variance per column (ddof=1); NaN-safe
variances = np.array(
[np.nanvar(x[:, j], ddof=1) if counts[j] > 1 else np.nan for j in range(x.shape[1])]
)
total_n = counts.sum()
grand_mean = np.sum(counts * means) / total_n
wss = float(np.nansum((counts - 1) * variances))
bss = float(np.sum(counts * (means - grand_mean) ** 2))
wdf = int(np.sum(counts - 1))
bdf = int(x.shape[1] - 1)
return {"bss": bss, "wss": wss, "bdf": bdf, "wdf": wdf}