Source code for pylimma.squeeze_var

# SPDX-License-Identifier: GPL-3.0-or-later
#
# This module is a Python port of code from R limma. Original R copyrights:
#   squeezeVar.R               Copyright (C) 2004-2025 Gordon Smyth
#   fitFDist.R                 Copyright (C) 2002-2024 Gordon Smyth,
#                                                      Belinda Phipson
#   fitFDistRobustly.R         Copyright (C) 2012-2024 Gordon Smyth,
#                                                      Belinda Phipson
#   fitFDistUnequalDF1.R       Copyright (C) 2024-2025 Gordon Smyth,
#                                                      Lizhong Chen
# Python port: Copyright (C) 2026 John Mulvey
"""
Empirical Bayes variance shrinkage for pylimma.

Implements the core eBayes variance moderation from limma:
- fit_f_dist(): estimate prior variance and degrees of freedom
- squeeze_var(): compute posterior variances
"""

from __future__ import annotations

import warnings

import numpy as np
from scipy import linalg
from scipy.optimize import brentq
from scipy.special import polygamma
from scipy.stats import f as f_dist

from .utils import logmdigamma, trigamma_inverse

# Pre-compute 128-point Gauss-Legendre quadrature nodes and weights
_GAUSS_NODES, _GAUSS_WEIGHTS = np.polynomial.legendre.leggauss(128)
# Transform from [-1, 1] to [0, 1] for uniform distribution
_GAUSS_NODES_UNIFORM = (_GAUSS_NODES + 1) / 2
_GAUSS_WEIGHTS_UNIFORM = _GAUSS_WEIGHTS / 2


def _winsorized_moments(
    df1: float,
    df2: float,
    winsor_tail_p: tuple[float, float],
) -> tuple[float, float]:
    """
    Compute theoretical mean and variance of Winsorized log-F distribution.

    Uses 128-point Gaussian quadrature to integrate over the F-distribution,
    matching R's fitFDistRobustly implementation.

    Parameters
    ----------
    df1 : float
        Numerator degrees of freedom.
    df2 : float
        Denominator degrees of freedom (can be np.inf).
    winsor_tail_p : tuple
        (lower, upper) tail proportions for Winsorization.

    Returns
    -------
    tuple
        (mean, variance) of the Winsorized log-F distribution.
    """
    prob_lower, prob_upper = winsor_tail_p

    # Quantiles of F-distribution at Winsorization points
    if np.isinf(df2):
        # F with df2=Inf is chi-squared(df1)/df1
        from scipy.stats import chi2

        fq_lower = chi2.ppf(prob_lower, df1) / df1
        fq_upper = chi2.ppf(1 - prob_upper, df1) / df1
    else:
        fq_lower = f_dist.ppf(prob_lower, df1, df2)
        fq_upper = f_dist.ppf(1 - prob_upper, df1, df2)

    # Log quantiles (Winsorization bounds for z = log(F))
    zq_lower = np.log(fq_lower)
    zq_upper = np.log(fq_upper)

    # Transform F quantiles using link function: q = f/(1+f)
    # This maps (0, inf) to (0, 1) for better numerical integration
    def linkfun(f):
        return f / (1 + f)

    def linkinv(q):
        return q / (1 - q)

    q_lower = linkfun(fq_lower)
    q_upper = linkfun(fq_upper)

    # Quadrature nodes in transformed space [q_lower, q_upper]
    q_range = q_upper - q_lower
    nodes_q = q_lower + q_range * _GAUSS_NODES_UNIFORM
    nodes_f = linkinv(nodes_q)
    nodes_z = np.log(nodes_f)

    # F-distribution density at nodes, with Jacobian for transformation
    # d/dq[linkinv(q)] = 1/(1-q)^2
    if np.isinf(df2):
        from scipy.stats import chi2

        # F = chi2/df1, so pdf_F(f) = df1 * pdf_chi2(f*df1)
        pdf_f = df1 * chi2.pdf(nodes_f * df1, df1)
    else:
        pdf_f = f_dist.pdf(nodes_f, df1, df2)

    # Jacobian: df/dq = 1/(1-q)^2
    jacobian = 1 / (1 - nodes_q) ** 2
    integrand_weight = pdf_f * jacobian

    # Compute mean: integral of z*f(z) over middle + contributions from tails
    # Middle part via quadrature
    middle_mean = q_range * np.sum(_GAUSS_WEIGHTS_UNIFORM * integrand_weight * nodes_z)
    # Tail contributions (Winsorized values times tail probabilities)
    tail_mean = zq_lower * prob_lower + zq_upper * prob_upper
    mean = middle_mean + tail_mean

    # Compute variance: E[(z-mean)^2]
    middle_var = q_range * np.sum(_GAUSS_WEIGHTS_UNIFORM * integrand_weight * (nodes_z - mean) ** 2)
    tail_var = (zq_lower - mean) ** 2 * prob_lower + (zq_upper - mean) ** 2 * prob_upper
    var = middle_var + tail_var

    return mean, var


def _natural_spline_basis(
    x: np.ndarray,
    df: int,
    intercept: bool = True,
    boundary_knots: tuple | None = None,
    interior_knots: np.ndarray | None = None,
    return_knots: bool = False,
):
    """
    Create natural cubic spline basis matrix matching R's splines::ns().

    Uses B-spline basis with natural boundary constraints (second derivative = 0
    at boundaries), matching R's implementation exactly.

    Parameters
    ----------
    x : ndarray
        Covariate values.
    df : int
        Degrees of freedom (number of basis functions).
    intercept : bool
        Whether to include intercept column.
    boundary_knots, interior_knots : optional
        Pre-computed knots. When supplied, the basis is evaluated at
        ``x`` using those knots (matching R's
        ``predict(ns_obj, newx=...)`` semantics). When None, knots are
        derived from ``x`` itself.
    return_knots : bool
        When True, also return the (boundary_knots, interior_knots)
        tuple alongside the basis so callers can re-evaluate at new x.

    Returns
    -------
    ndarray (or (ndarray, knots) tuple)
        Spline basis matrix of shape (len(x), df).
    """
    from scipy.interpolate import BSpline

    x = np.asarray(x, dtype=np.float64)
    n = len(x)

    if df < 1:
        raise ValueError("df must be at least 1")

    # For df=1, just return intercept or constant
    if df == 1:
        basis = np.ones((n, 1))
        if return_knots:
            return basis, ((x.min(), x.max()), np.array([]))
        return basis

    # Number of interior knots (matching R's ns logic)
    n_interior = df - 1 - int(intercept)
    if n_interior < 0:
        n_interior = 0

    # Boundary knots: derive from x unless caller supplied them.
    if boundary_knots is None:
        boundary = (x.min(), x.max())
    else:
        boundary = boundary_knots

    # Handle edge case where all x values are identical
    if boundary[0] == boundary[1]:
        basis = np.ones((n, 1)) if df == 1 else np.column_stack([np.ones(n), np.zeros(n)])[:, :df]
        if return_knots:
            return basis, (boundary, np.array([]))
        return basis

    # Interior knots: derive from x unless caller supplied them.
    if interior_knots is not None:
        interior_knots = np.asarray(interior_knots, dtype=np.float64)
    elif n_interior > 0:
        knot_probs = np.linspace(0, 1, n_interior + 2)[1:-1]
        interior_knots = np.quantile(x, knot_probs)
    else:
        interior_knots = np.array([])

    # Augmented knot vector: boundary repeated 4 times (for cubic splines, order=4)
    # plus interior knots, matching R's: sort(c(rep(Boundary.knots, 4), knots))
    knots = np.sort(
        np.concatenate([np.repeat(boundary[0], 4), interior_knots, np.repeat(boundary[1], 4)])
    )

    # Number of B-spline basis functions
    n_basis = len(knots) - 4

    # Create B-spline basis matrix
    basis = np.zeros((n, n_basis))
    for i in range(n_basis):
        c = np.zeros(n_basis)
        c[i] = 1.0
        spline = BSpline(knots, c, k=3, extrapolate=True)
        basis[:, i] = spline(x)

    # Compute constraint matrix: second derivatives at boundary knots
    # For natural splines, second derivative must be 0 at boundaries
    const = np.zeros((2, n_basis))
    for i in range(n_basis):
        c = np.zeros(n_basis)
        c[i] = 1.0
        spline = BSpline(knots, c, k=3)
        spline_d2 = spline.derivative(2)
        const[0, i] = spline_d2(boundary[0])
        const[1, i] = spline_d2(boundary[1])

    # Remove intercept column before applying constraint if needed
    # (matching R's order of operations)
    if not intercept:
        const = const[:, 1:]
        basis = basis[:, 1:]

    # Apply natural spline constraint via QR decomposition
    # This matches R's: (t(qr.qty(qr.const, t(basis))))[, -(1:2)]
    Q, _ = linalg.qr(const.T, mode="full")
    transformed = Q.T @ basis.T
    basis_final = transformed.T[:, 2:]  # Drop first 2 columns (constrained directions)

    if return_knots:
        return basis_final, (boundary, interior_knots)
    return basis_final


def _fit_spline_trend(e: np.ndarray, covariate: np.ndarray, splinedf: int) -> tuple:
    """
    Fit spline trend to adjusted log-variances.

    Parameters
    ----------
    e : ndarray
        Adjusted log-variances (log(x) + logmdigamma(df1/2)).
    covariate : ndarray
        Covariate values (e.g., mean expression).
    splinedf : int
        Spline degrees of freedom.

    Returns
    -------
    tuple
        (fitted_values, residual_variance, coefficients, design,
        spline_knots). ``spline_knots`` is ``(boundary, interior)`` or
        None when the linear fallback fired; callers can replay the
        basis at new x via ``_natural_spline_basis(...,
        boundary_knots=..., interior_knots=...)``.
    """
    n = len(e)

    # Create spline basis. Capture the knots so callers can re-evaluate
    # the basis at new covariate points (matching R's
    # `predict(design, newx=...)` in fitFDist.R:90-97).
    spline_knots = None
    try:
        design, spline_knots = _natural_spline_basis(
            covariate, df=splinedf, intercept=True, return_knots=True
        )
    except Exception:
        # Fall back to simple linear fit
        design = np.column_stack([np.ones(n), covariate])

    # Fit linear model
    q, r = linalg.qr(design, mode="economic")
    rank = np.sum(np.abs(np.diag(r)) > 1e-10)

    # Solve for coefficients
    qty = q.T @ e
    coef = linalg.solve_triangular(r[:rank, :rank], qty[:rank])

    # Fitted values
    fitted = design[:, :rank] @ coef

    # Residual variance. With economic QR, effects has only `p` entries, so
    # residual SS is ||e||^2 - ||qty||^2 (the portion of e outside the column
    # space of design). Divide by n - rank (R's df.residual convention).
    df_resid = n - rank
    if df_resid > 0:
        residual_ss = np.sum(e**2) - np.sum(qty**2)
        # Guard against negative from floating-point noise near zero
        residual_var = max(residual_ss, 0.0) / df_resid
    else:
        residual_var = 0.0

    return fitted, residual_var, coef, design, spline_knots


[docs] def fit_f_dist( x: np.ndarray, df1: np.ndarray | float, covariate: np.ndarray | None = None, ) -> dict: """ Fit a scaled F-distribution to sample variances. Estimates the scale factor (prior variance) and denominator degrees of freedom (prior df) by the method of moments on log(F). Port of R limma's fitFDist function (Gordon Smyth). Parameters ---------- x : array_like Sample variances. Should be positive. df1 : array_like or float Numerator degrees of freedom for each variance (residual df). covariate : array_like, optional If provided, allows the scale to vary as a function of the covariate (e.g., mean expression). Uses natural cubic splines to fit the trend. Returns ------- dict scale : float or ndarray Estimated prior variance (s0^2). If covariate is provided, this is an array of gene-specific prior variances. df2 : float Estimated prior degrees of freedom (d0). Notes ----- Uses the relationship that if s^2 ~ s0^2 * F(d1, d0), then E[log(s^2)] = log(s0^2) + digamma(d1/2) - digamma(d0/2) + log(d0/d1) and Var[log(s^2)] = trigamma(d1/2) + trigamma(d0/2). When covariate is provided, the scale (s0^2) is allowed to vary as a smooth function of the covariate, typically the average log-expression. References ---------- Smyth, G. K. (2004). Linear models and empirical Bayes methods for assessing differential expression in microarray experiments. Statistical Applications in Genetics and Molecular Biology, 3(1), Article 3. """ x = np.asarray(x, dtype=np.float64) n = len(x) if n == 0: return {"scale": np.nan, "df2": np.nan} if n == 1: return {"scale": float(x[0]), "df2": 0.0} # Handle df1. R fitFDist.R:13-19 checks scalar df1 validity BEFORE # broadcasting; an invalid scalar returns NA/NA immediately rather # than falling through to per-element handling with an all-False # ok mask. df1 = np.asarray(df1, dtype=np.float64) if df1.ndim == 0: if not (np.isfinite(df1) and df1 > 1e-15): return {"scale": np.nan, "df2": np.nan} df1 = np.full(n, float(df1)) elif len(df1) != n: # R fitFDist.R:21: `if(length(df1) != n) stop(...)`. Without # this check pylimma falls through to a numpy IndexError. raise ValueError("x and df1 have different lengths") # Check covariate if covariate is not None: covariate = np.asarray(covariate, dtype=np.float64) if len(covariate) != n: raise ValueError("x and covariate must be of same length") if np.any(np.isnan(covariate)): raise ValueError("NA covariate values not allowed") # Handle infinite covariate values finite_cov = np.isfinite(covariate) if not np.all(finite_cov): if np.any(finite_cov): cov_range = (np.min(covariate[finite_cov]), np.max(covariate[finite_cov])) covariate = covariate.copy() covariate[covariate == -np.inf] = cov_range[0] - 1 covariate[covariate == np.inf] = cov_range[1] + 1 else: covariate = np.sign(covariate) # Check for valid df1 ok = np.isfinite(df1) & (df1 > 1e-15) if df1.size == 1 and not ok[0]: return {"scale": np.nan, "df2": np.nan} # Remove invalid values ok = ok & np.isfinite(x) & (x > -1e-15) nok = np.sum(ok) notallok = nok < n if nok == 1: return {"scale": float(x[ok][0]), "df2": 0.0} # Store indices for later expansion if notallok: x = x[ok] if len(df1) > 1: df1 = df1[ok] if covariate is not None: covariate_notok = covariate[~ok] covariate = covariate[ok] # Determine spline df for trend if covariate is not None: # Auto-select spline df based on sample size (like R) splinedf = 1 + int(nok >= 3) + int(nok >= 6) + int(nok >= 30) splinedf = min(splinedf, len(np.unique(covariate))) # If covariate has too few unique values, recall without covariate if splinedf < 2: result = fit_f_dist(x=x, df1=df1, covariate=None) result["scale"] = np.full(n, result["scale"]) return result # Avoid exactly zero variances x = np.maximum(x, 0.0) m = np.median(x) if m == 0: warnings.warn("More than half of residual variances are exactly zero: eBayes unreliable") m = 1.0 elif np.any(x == 0): warnings.warn("Zero sample variances detected, have been offset away from zero") x = np.maximum(x, 1e-5 * m) # Work on log scale z = np.log(x) e = z + logmdigamma(df1 / 2) if covariate is None: # Simple mean emean = np.mean(e) evar = np.var(e, ddof=1) else: # Fit spline trend emean, evar, coef, design, spline_knots = _fit_spline_trend(e, covariate, splinedf) # Expand emean to full length if needed. Reuse the spline_knots # captured during the fit so the basis is evaluated at the # not-ok covariate points using the SAME knots - matching R's # `design2 <- predict(design, newx=covariate.notok)` # (fitFDist.R:91). Rebuilding the basis from # ``covariate_notok`` would derive new boundary/interior # knots and yield a different function. if notallok: emean_full = np.zeros(n) emean_full[ok] = emean try: if spline_knots is not None: design_notok = _natural_spline_basis( covariate_notok, df=splinedf, intercept=True, boundary_knots=spline_knots[0], interior_knots=spline_knots[1], ) else: # Linear fallback path: replay [1, x] design. design_notok = np.column_stack([np.ones(len(covariate_notok)), covariate_notok]) emean_full[~ok] = design_notok[:, : len(coef)] @ coef except Exception: # Fall back to nearest neighbor or mean emean_full[~ok] = np.mean(emean) emean = emean_full # Estimate scale and df2 evar = evar - np.mean(polygamma(1, df1 / 2)) # subtract trigamma # R fitFDist.R: NaN evar (e.g. all-NaN var input) propagates as NA # df2 / NA scale; the `if(any(x==0))` and `if(evar > 0)` branches # raise "missing value where TRUE/FALSE needed" downstream. Mirror # that signal here so the caller's `anyNA(df.prior)` check fires. if np.isnan(evar): return {"scale": np.nan, "df2": np.nan} if evar > 0: df2 = 2 * trigamma_inverse(evar) s20 = np.exp(emean - logmdigamma(df2 / 2)) else: df2 = np.inf if covariate is None: # Use simple pooled variance (MLE when df2 is infinite) s20 = np.mean(x) else: # Use trend-based estimate s20 = np.exp(emean) # Return appropriate type for scale if covariate is None: return {"scale": float(s20), "df2": float(df2)} else: return {"scale": s20, "df2": float(df2)}
[docs] def fit_f_dist_robustly( x: np.ndarray, df1: np.ndarray | float, covariate: np.ndarray | None = None, winsor_tail_p: tuple[float, float] = (0.05, 0.1), trace: bool = False, ) -> dict: """ Robust estimation of scaled F-distribution parameters. Estimates the scale factor and denominator degrees of freedom using Winsorized moments of log(F) values, which provides robustness to outlier variances. Parameters ---------- x : array_like Sample variances. Should be positive. df1 : array_like or float Numerator degrees of freedom for each variance. covariate : array_like, optional If provided, allows the scale to vary as a function of the covariate. Not yet fully implemented. winsor_tail_p : tuple of float, default (0.05, 0.1) Lower and upper tail proportions for Winsorization. Returns ------- dict scale : float or ndarray Estimated prior variance (s0^2). df2 : float Estimated prior degrees of freedom (d0). df2_shrunk : ndarray Gene-wise shrunken prior df, accounting for outliers. Notes ----- This function is more robust than fit_f_dist() when there are outlier variances. It uses Winsorization to limit the influence of extreme values on the moment estimates. The df2_shrunk values are shrunk towards the pooled df for genes identified as potential outliers (having unusually large variances). References ---------- Phipson, B. and Smyth, G. K. (2016). Robust hyperparameter estimation protects against hypervariable genes and improves power to detect differential expression. Annals of Applied Statistics, 10(2), 946-963. """ x = np.asarray(x, dtype=np.float64) n = len(x) # Handle edge cases if n < 2: return {"scale": np.nan, "df2": np.nan, "df2_shrunk": np.full(n, np.nan)} if n == 2: result = fit_f_dist(x=x, df1=df1, covariate=covariate) return { "scale": result["scale"], "df2": result["df2"], "df2_shrunk": np.full(n, result["df2"]), } # Handle df1 df1 = np.asarray(df1, dtype=np.float64) if df1.ndim == 0: df1 = np.full(n, float(df1)) # Filter valid observations ok = ~np.isnan(x) & np.isfinite(df1) & (df1 > 1e-6) if not np.all(ok): # Recurse on valid subset x_ok = x[ok] df1_ok = df1[ok] if len(df1) > 1 else df1 cov_ok = covariate[ok] if covariate is not None else None sub_fit = fit_f_dist_robustly( x=x_ok, df1=df1_ok, covariate=cov_ok, winsor_tail_p=winsor_tail_p, trace=trace ) # Expand results df2_shrunk = np.full(n, sub_fit["df2"]) df2_shrunk[ok] = sub_fit["df2_shrunk"] if covariate is None: scale = sub_fit["scale"] else: scale = np.full(n, np.nan) scale[ok] = sub_fit["scale"] # Interpolate for non-ok values. R uses approxfun(rule=2): # linear interpolation within range, boundary-value clamp outside. if isinstance(sub_fit["scale"], np.ndarray): from scipy.interpolate import interp1d x_ok = covariate[ok] y_ok = np.log(sub_fit["scale"]) sort_idx = np.argsort(x_ok, kind="stable") x_sorted = x_ok[sort_idx] y_sorted = y_ok[sort_idx] f = interp1d( x_sorted, y_sorted, kind="linear", bounds_error=False, fill_value=(y_sorted[0], y_sorted[-1]), ) scale[~ok] = np.exp(f(covariate[~ok])) return {"scale": scale, "df2": sub_fit["df2"], "df2_shrunk": df2_shrunk} # Avoid zero or negative variances m = np.median(x) if m <= 0: raise ValueError("Variances are mostly <= 0") x = np.maximum(x, m * 1e-12) # Get non-robust estimates as baseline non_robust = fit_f_dist(x=x, df1=df1, covariate=covariate) # Check winsorization proportions prob_lower = winsor_tail_p[0] prob_upper = winsor_tail_p[1] if prob_lower < 1 / n and prob_upper < 1 / n: return { "scale": non_robust["scale"], "df2": non_robust["df2"], "df2_shrunk": np.full(n, non_robust["df2"]), } # Work with log scale z = np.log(x) # De-mean (or de-trend if covariate provided) if covariate is None: # Trimmed mean - R's mean(z, trim=t) trims t proportion from BOTH ends # Use winsor.tail.p[2] (prob_upper) as the trim proportion trim_frac = prob_upper lo = int(np.floor(n * trim_frac)) hi = n - lo z_sorted = np.sort(z) z_trimmed = z_sorted[lo:hi] if lo < hi else z_sorted z_trend = np.mean(z_trimmed) z_resid = z - z_trend else: # Use LOWESS for trend. R calls `limma::loessFit(z, covariate, span=0.4)` # which dispatches to base R's `lowess()` when no weights are supplied. # R's lowess() defaults `delta = 0.01 * diff(range(x))` (skips # computation at nearby points); statsmodels defaults delta=0, which # disagrees with R by up to ~3e-3. Pass R's default explicitly. try: from statsmodels.nonparametric.smoothers_lowess import lowess delta = 0.01 * (np.max(covariate) - np.min(covariate)) smoothed = lowess(z, covariate, frac=0.4, delta=delta, return_sorted=False) z_trend = smoothed z_resid = z - z_trend except ImportError: warnings.warn("statsmodels not available for LOWESS; using simple mean") z_trend = np.mean(z) z_resid = z - z_trend # Winsorize residuals q_lower = np.quantile(z_resid, prob_lower) q_upper = np.quantile(z_resid, 1 - prob_upper) z_wins = np.clip(z_resid, q_lower, q_upper) # Moments of Winsorized residuals z_wins_mean = np.mean(z_wins) z_wins_var = np.var(z_wins, ddof=1) if trace: print(f"Variance of Winsorized Fisher-z: {z_wins_var}") # Use constant df1 (take max if variable) if len(np.unique(df1)) > 1: df1_use = np.max(df1) else: df1_use = df1[0] if isinstance(df1, np.ndarray) else df1 # Check if df2=Inf fits the data using Gaussian quadrature # Compute theoretical Winsorized variance for df2=Inf mom_inf = _winsorized_moments(df1_use, np.inf, (prob_lower, prob_upper)) fun_val_inf = np.log(z_wins_var / mom_inf[1]) if fun_val_inf <= 0: # df2 is effectively infinite df2 = np.inf # Correct trend for bias using theoretical Winsorized mean z_trend_corrected = z_trend + z_wins_mean - mom_inf[0] s20 = np.exp(z_trend_corrected) # Identify outliers f_stat = np.exp(z - z_trend_corrected) from scipy.stats import chi2 tail_p = chi2.sf(f_stat * df1_use, df1_use) # Empirical tail probability r = np.argsort(np.argsort(f_stat)[::-1]) + 1 # rank from largest emp_tail_p = (r - 0.5) / n # Probability of not being an outlier prob_not_outlier = np.minimum(tail_p / emp_tail_p, 1.0) # Shrink df for outliers df_pooled = n * df1_use df2_shrunk = np.full(n, df2) outlier_mask = prob_not_outlier < 1 if np.any(outlier_mask): df2_shrunk[outlier_mask] = prob_not_outlier[outlier_mask] * df_pooled # Make monotonic order = np.argsort(tail_p) df2_ordered = df2_shrunk[order] df2_shrunk[order] = np.maximum.accumulate(df2_ordered) return { "scale": s20, "df2": df2, "df2_shrunk": df2_shrunk, "tail_p_value": tail_p, # R parity: diagnostic return } # Estimate df2 by matching Winsorized variance using root-finding # Start from non-robust estimate as lower bound if not np.isfinite(non_robust["df2"]): return { "scale": non_robust["scale"], "df2": non_robust["df2"], "df2_shrunk": np.full(n, non_robust["df2"]), } # Link function to map df2 from (0, inf) to (0, 1) for root-finding def linkfun(x): return x / (1 + x) def linkinv(x): return x / (1 - x) # Objective function: log(observed_var / theoretical_var) # We want to find df2 where this equals zero def objective(x_transformed): df2_try = linkinv(x_transformed) mom = _winsorized_moments(df1_use, df2_try, (prob_lower, prob_upper)) return np.log(z_wins_var / mom[1]) # Use non-robust estimate as lower bound x_lower = linkfun(non_robust["df2"]) fun_val_lower = objective(x_lower) if fun_val_lower >= 0: # Non-robust estimate already satisfies the constraint df2 = non_robust["df2"] else: # Root is between non-robust estimate and infinity # Use brentq for root-finding try: x_root = brentq(objective, x_lower, 1.0 - 1e-10, xtol=1e-8) df2 = linkinv(x_root) except ValueError: # Fallback if root-finding fails df2 = non_robust["df2"] # Compute scale using corrected trend mom = _winsorized_moments(df1_use, df2, (prob_lower, prob_upper)) z_trend_corrected = z_trend + z_wins_mean - mom[0] s20 = np.exp(z_trend_corrected) # Outlier detection and df shrinkage f_stat = np.exp(z - z_trend_corrected) tail_p = f_dist.sf(f_stat, df1_use, df2) # Empirical tail probability r = np.argsort(np.argsort(f_stat)[::-1]) + 1 emp_tail_p = (r - 0.5) / n # Probability of being an outlier log_tail_p = np.log(np.maximum(tail_p, 1e-300)) log_emp_tail_p = np.log(emp_tail_p) log_prob_not_outlier = np.minimum(log_tail_p - log_emp_tail_p, 0) prob_not_outlier = np.exp(log_prob_not_outlier) prob_outlier = 1 - prob_not_outlier # Compute df2 for outliers if np.any(log_prob_not_outlier < 0): min_log_tail_p = np.min(log_tail_p) if min_log_tail_p == -np.inf: df2_outlier = 0 df2_shrunk = prob_not_outlier * df2 else: # df2_outlier makes max F-stat the median of the distribution df2_outlier = np.log(0.5) / min_log_tail_p * df2 # Iterate for accuracy (matching R's refinement step) new_log_tail_p = f_dist.logsf(np.max(f_stat), df1_use, df2_outlier) df2_outlier = np.log(0.5) / new_log_tail_p * df2_outlier df2_outlier = max(df2_outlier, 0) df2_shrunk = prob_not_outlier * df2 + prob_outlier * df2_outlier # Make monotonic in tail p-value order = np.argsort(log_tail_p) df2_ordered = df2_shrunk[order] # Cumulative minimum from smallest tail p cum_mean = np.cumsum(df2_ordered) / np.arange(1, n + 1) i_min = np.argmin(cum_mean) df2_ordered[: i_min + 1] = cum_mean[i_min] df2_shrunk[order] = np.maximum.accumulate(df2_ordered) else: df2_shrunk = np.full(n, df2) return { "scale": s20, "df2": df2, "df2_shrunk": df2_shrunk, "tail_p_value": tail_p, # R parity: diagnostic return }
[docs] def fit_f_dist_unequal_df1( x: np.ndarray, df1: np.ndarray, covariate: np.ndarray | None = None, span: float | None = None, robust: bool = False, prior_weights: np.ndarray | None = None, ) -> dict: """ Fit scaled F-distribution with unequal df1 values. Robust estimation of the parameters of a scaled F-distribution when df1 varies substantially between observations. Uses maximum likelihood estimation with inverse-trigamma weighting. This version is preferred when df1 values are small or vary across genes, such as from edgeR quasi-likelihood pipelines. Parameters ---------- x : array_like Sample variances. Should be positive. df1 : array_like Numerator degrees of freedom for each variance (can vary per gene). covariate : array_like, optional If provided, allows the scale to vary as a function of the covariate. span : float, optional LOWESS span for covariate trend. If None, chosen automatically. robust : bool, default False Use FDR-based outlier detection and re-weighting. prior_weights : array_like, optional Prior weights for each observation. Returns ------- dict scale : float or ndarray Estimated prior variance (s0^2). df2 : float Estimated prior degrees of freedom (d0). df2_shrunk : ndarray, optional Gene-wise shrunken df2 (only if robust=True and outliers found). Notes ----- Key differences from fit_f_dist: - Uses inverse-trigamma weighting: w = 1/trigamma(df1/2) - Uses maximum likelihood optimization instead of method of moments - Better handles small or varying df1 values References ---------- Smyth, G. K. and Chen, L. (2024). limma package source code. """ from scipy.optimize import minimize_scalar from scipy.special import gammaln x = np.asarray(x, dtype=np.float64) df1 = np.asarray(df1, dtype=np.float64) n = len(x) # Validate inputs if df1.ndim == 0: df1 = np.full(n, float(df1)) if len(df1) != n: raise ValueError("x and df1 are different lengths") if np.any(np.isnan(df1)): raise ValueError("NA df1 values") # Check covariate if covariate is not None: covariate = np.asarray(covariate, dtype=np.float64) if len(covariate) != n: raise ValueError("x and covariate are different lengths") if np.any(np.isnan(covariate)): raise ValueError("covariate contains NA values") # Check prior_weights if prior_weights is not None: prior_weights = np.asarray(prior_weights, dtype=np.float64) if len(prior_weights) != n: raise ValueError("x and prior_weights are different lengths") if np.any(np.isnan(prior_weights)): raise ValueError("prior_weights contain NA values") if np.any(prior_weights < 0): raise ValueError("prior_weights are negative") # Handle NAs in x if np.any(np.isnan(x)): na_mask = np.isnan(x) if prior_weights is None: prior_weights = (~na_mask).astype(np.float64) else: prior_weights = prior_weights.copy() prior_weights[na_mask] = 0 x = x.copy() x[na_mask] = 0 # Treat small df1 values as uninformative if np.min(df1) < 0.01: small_df1 = df1 < 0.01 if prior_weights is None: prior_weights = (~small_df1).astype(np.float64) else: prior_weights = prior_weights.copy() prior_weights[small_df1] = 0 df1 = df1.copy() df1[small_df1] = 1 has_prior_weights = prior_weights is not None # Check for informative observations informative = x > 0 if has_prior_weights: informative = informative & (prior_weights > 0) n_informative = np.sum(informative) if n_informative < 2: return {"scale": np.nan, "df2": np.nan} if n_informative == 2: covariate = None robust = False prior_weights = None has_prior_weights = False # Avoid exactly zero x values m = np.median(x[informative]) xpos = np.maximum(x, 1e-12 * m) # Work on log(F) scale z = np.log(xpos) # Average log(F) adjusted for df1 d1 = df1 / 2 e = z + logmdigamma(d1) # Inverse trigamma weights w = 1.0 / polygamma(1, d1) if has_prior_weights: w = w * prior_weights # Compute emean (mean or trend) if covariate is None: emean = np.sum(w * e) / np.sum(w) else: from .utils import choose_lowess_span, loess_fit if span is None: span = choose_lowess_span(n, small_n=500) # Normalize weights for LOWESS w_norm = w / np.quantile(w, 0.75) w_norm = np.clip(w_norm, 1e-8, 1e2) fit = loess_fit(e, covariate, weights=w_norm, span=span, iterations=1) emean = fit["fitted"] # Maximum likelihood optimization # Reparameterize: par = d2/(1+d2), so d2 = par/(1-par) d1x = d1 * xpos def neg_twice_log_lik(par): if par <= 0 or par >= 1: return np.inf d2 = par / (1 - par) d2s20 = d2 * np.exp(emean - logmdigamma(d2)) # Ensure d2s20 is positive d2s20 = np.maximum(d2s20, 1e-100) # Log-likelihood terms ll = ( -(d1 + d2) * np.log1p(d1x / d2s20) - d1 * np.log(d2s20) + gammaln(d1 + d2) - gammaln(d2) ) if has_prior_weights: ll = prior_weights * ll return -2 * np.sum(ll) # Optimize (search in range corresponding to df2 from 1 to 5000) # Use R's default tolerance: .Machine$double.eps^0.25 # This matters when the likelihood surface is flat (large df2) r_tol = np.finfo(float).eps ** 0.25 result = minimize_scalar( neg_twice_log_lik, bounds=(0.5, 0.9998), method="bounded", options={"xatol": r_tol} ) par_opt = result.x d2 = par_opt / (1 - par_opt) s20 = np.exp(emean - logmdigamma(d2)) df2 = 2 * d2 # Return non-robust result if robust=False if not robust: return {"scale": s20, "df2": df2} # Robust mode: FDR-based outlier detection from scipy.stats import f as f_dist from .utils import p_adjust f_stat = x / s20 right_p = f_dist.sf(f_stat, df1, df2) left_p = 1 - right_p # Handle very small left_p small_left = left_p < 0.001 if np.any(small_left): left_p[small_left] = f_dist.cdf(f_stat[small_left], df1[small_left], df2) two_sided_p = 2 * np.minimum(left_p, right_p) fdr = p_adjust(two_sided_p, method="BH") fdr[fdr > 0.3] = 1.0 # If no outliers, return non-robust estimates if np.min(fdr) == 1: return {"scale": s20, "df2": df2} # Refit with FDR as prior weights refit = fit_f_dist_unequal_df1( x=x, df1=df1, covariate=covariate, span=span, robust=False, prior_weights=fdr ) s20 = refit["scale"] df2 = refit["df2"] # Identify right outliers using QQ-type method r = np.argsort(np.argsort(f_stat)[::-1]) + 1 # rank from largest uniform_p = (n - r + 0.5) / n prob_not_outlier = np.minimum(right_p / uniform_p, 1.0) # If no right outliers, return robust estimates without df2 shrinkage if np.min(prob_not_outlier) == 1: return {"scale": s20, "df2": df2} # Compute shrunk df2 for outliers i_min = np.argmin(right_p) min_right_p = right_p[i_min] if min_right_p == 0: df2_outlier = 0.0 df2_shrunk = prob_not_outlier * df2 else: # Find df2_outlier to make max F-stat the median of distribution df2_outlier = np.log(0.5) / np.log(min_right_p) * df2 # Iterate for accuracy new_log_right_p = f_dist.logsf(f_stat[i_min], df1[i_min], df2_outlier) if new_log_right_p != 0: df2_outlier = np.log(0.5) / new_log_right_p * df2_outlier df2_shrunk = prob_not_outlier * df2 + (1 - prob_not_outlier) * df2_outlier # Force df2_shrunk to be monotonic in right_p order = np.argsort(right_p) df2_ordered = df2_shrunk[order] m_cumsum = np.cumsum(df2_ordered) / np.arange(1, n + 1) i_min_cumsum = np.argmin(m_cumsum) df2_ordered[: i_min_cumsum + 1] = m_cumsum[i_min_cumsum] df2_shrunk[order] = np.maximum.accumulate(df2_ordered) return {"scale": s20, "df2": df2, "df2_outlier": df2_outlier, "df2_shrunk": df2_shrunk}
def _squeeze_var_core( var: np.ndarray, df: np.ndarray, var_prior: float | np.ndarray, df_prior: float | np.ndarray, ) -> np.ndarray: """ Compute posterior variances given hyperparameters. Internal function implementing the Bayesian shrinkage formula. Parameters ---------- var : ndarray Sample variances. df : ndarray Residual degrees of freedom. var_prior : float or ndarray Prior variance (s0^2). df_prior : float or ndarray Prior degrees of freedom (d0). Returns ------- ndarray Posterior variances. """ var = np.asarray(var, dtype=np.float64) df = np.asarray(df, dtype=np.float64) df_prior = np.asarray(df_prior, dtype=np.float64) var_prior = np.asarray(var_prior, dtype=np.float64) n = len(var) # Broadcast df if scalar if df.ndim == 0: df = np.full(n, float(df)) # Check if all df_prior are finite if np.all(np.isfinite(df_prior)): # Standard shrinkage formula return (df * var + df_prior * var_prior) / (df + df_prior) # Handle case where some or all df_prior are infinite if var_prior.ndim == 0: var_post = np.full(n, float(var_prior)) else: var_post = var_prior.copy() # Check if all df_prior are effectively infinite if np.all(df_prior > 1e100): return var_post # Mixed case: some finite, some infinite finite_mask = np.isfinite(df_prior) if np.any(finite_mask): df_i = df[finite_mask] if len(df) > 1 else df df_prior_i = df_prior[finite_mask] if df_prior.ndim > 0 else df_prior var_post[finite_mask] = (df_i * var[finite_mask] + df_prior_i * var_post[finite_mask]) / ( df_i + df_prior_i ) return var_post
[docs] def squeeze_var( var: np.ndarray, df: np.ndarray | float, covariate: np.ndarray | None = None, span: float | None = None, robust: bool = False, winsor_tail_p: tuple[float, float] = (0.05, 0.1), legacy: bool | None = None, ) -> dict: """ Empirical Bayes posterior variances. Squeeze sample variances towards a common prior using empirical Bayes. This is the core variance moderation step in eBayes. Parameters ---------- var : array_like Sample variances. df : array_like or float Residual degrees of freedom for each variance. covariate : array_like, optional Covariate for mean-variance trend (e.g., average log-expression). When provided, the prior variance is allowed to vary as a smooth function of the covariate. span : float, optional Span for lowess smoothing when fitting the mean-variance trend. Only used when legacy=False or when span is explicitly provided. If None, an appropriate span is chosen automatically. robust : bool, default False Use robust estimation of hyperparameters. winsor_tail_p : tuple of float, default (0.05, 0.1) Winsorization proportions for robust estimation. legacy : bool, optional If True, use the original limma hyperparameter estimation method (fit_f_dist or fit_f_dist_robustly). If False, use the newer method (fit_f_dist_unequal_df1) which handles unequal df1 values better. If None (default), auto-detect: use legacy=True if all df are equal, legacy=False otherwise. Setting span explicitly forces legacy=False. Returns ------- dict var_post : ndarray Posterior variances. var_prior : float or ndarray Prior variance (s0^2). df_prior : float Prior degrees of freedom (d0). Notes ----- The posterior variance is a weighted average of the sample variance and prior variance: var_post = (df * var + df_prior * var_prior) / (df + df_prior) References ---------- Smyth, G. K. (2004). Linear models and empirical Bayes methods for assessing differential expression in microarray experiments. Statistical Applications in Genetics and Molecular Biology, 3(1), Article 3. """ var = np.asarray(var, dtype=np.float64) n = len(var) if n == 0: raise ValueError("var is empty") # Not enough observations for empirical Bayes if n < 3: return {"var_post": var.copy(), "var_prior": var.copy(), "df_prior": 0.0} df = np.asarray(df, dtype=np.float64) if df.ndim == 0: df = np.full(n, float(df)) # Guard against missing/infinite values when df==0 if len(df) > 1: var = var.copy() var[df == 0] = 0.0 # span is only implemented for new hyperparameter function if span is not None: legacy = False # Choose legacy or new hyperparameter method depending on whether # df are unequal. R squeezeVar.R:23-26: when df has no positive # entries, `identical(min(empty), max(empty))` is `identical(Inf, # -Inf)` which is FALSE, so R routes to the unequal-df1 fitter # (which then errors with "Could not estimate prior df"). if legacy is None: df_pos = df[df > 0] if len(df_pos) > 0: legacy = np.min(df_pos) == np.max(df_pos) else: legacy = False # Estimate hyperparameters if legacy: if robust: fit = fit_f_dist_robustly(var, df1=df, covariate=covariate, winsor_tail_p=winsor_tail_p) df_prior = fit["df2"] df_prior_shrunk = fit["df2_shrunk"] else: fit = fit_f_dist(var, df1=df, covariate=covariate) df_prior = fit["df2"] df_prior_shrunk = None else: fit = fit_f_dist_unequal_df1(var, df1=df, covariate=covariate, span=span, robust=robust) df_prior_shrunk = fit.get("df2_shrunk") df_prior = df_prior_shrunk if df_prior_shrunk is not None else fit["df2"] if np.isscalar(df_prior) and np.isnan(df_prior): raise ValueError("Could not estimate prior df") elif not np.isscalar(df_prior) and np.any(np.isnan(df_prior)): raise ValueError("Could not estimate prior df") # Compute posterior variances if robust and df_prior_shrunk is not None: # Use gene-wise shrunken df_prior for outlier robustness # R's squeezeVar returns df2.shrunk as df.prior in robust mode var_post = _squeeze_var_core( var=var, df=df, var_prior=fit["scale"], df_prior=df_prior_shrunk ) return { "df_prior": df_prior_shrunk, # Per-gene values, matching R "var_prior": fit["scale"], "var_post": var_post, } else: var_post = _squeeze_var_core(var=var, df=df, var_prior=fit["scale"], df_prior=df_prior) return {"df_prior": df_prior, "var_prior": fit["scale"], "var_post": var_post}