Source code for pylimma.lmfit

# SPDX-License-Identifier: GPL-3.0-or-later
#
# This module is a Python port of code from R limma. Original R copyrights:
#   lmfit.R (lmFit, lm.series, gls.series)
#                              Copyright (C) 2003-2023 Gordon Smyth
#   lmfit.R (mrlm)             Copyright (C) 2002-2020 Gordon Smyth
#   lmEffects.R                Copyright (C) 2016-2020 Gordon Smyth
#
# mrlm() additionally ports the IRLS / Huber-M-estimation algorithm from
# the R MASS package (which limma's mrlm calls per gene):
#   MASS::rlm                  Copyright (C) Brian Ripley, Bill Venables;
#                              GPL-2 | GPL-3
# Python port: Copyright (C) 2026 John Mulvey
"""
Linear model fitting for pylimma.

Implements the core linear model fitting from limma:
- lm_fit(): main entry point for fitting linear models to expression data
- lm_series(): OLS/WLS fitting for each gene
"""

from __future__ import annotations

import warnings
from typing import TYPE_CHECKING

import numpy as np
import pandas as pd
from scipy import linalg

from .classes import MArrayLM, _is_anndata, as_matrix_weights, get_eawp
from .dups import duplicate_correlation, unwrap_dups

if TYPE_CHECKING:
    pass


def _qr_r_style(x: np.ndarray, tol: float = 1e-7) -> tuple:
    """
    QR decomposition matching R's ``qr(x, LAPACK=FALSE)`` semantics.

    Unlike ``scipy.linalg.qr(pivoting=True)``, which uses LAPACK column-norm
    pivoting and can reorder columns by magnitude at every step, R's
    Linpack-based ``qr`` keeps columns in the order supplied and only
    swaps a column to the end of the active set when its residual norm
    drops below ``tol`` -- i.e. when it is found collinear with the
    preceding columns. This makes the "later of two collinear columns is
    the redundant one" rule deterministic and reproducible.

    Parameters
    ----------
    x : ndarray
        Matrix of shape (n, p).
    tol : float
        Relative tolerance on column norms. R's default is ``1e-7``.

    Returns
    -------
    q, r : ndarray
        QR factorisation of ``x[:, pivot]`` in economy mode
        (``q`` is ``(n, p)``, ``r`` is ``(p, p)``).
    pivot : ndarray of int
        Permutation of column indices. ``pivot[:rank]`` are the
        original indices of estimable columns in their original order;
        ``pivot[rank:]`` are the original indices of columns flagged
        as collinear with an earlier column.
    rank : int
        Number of estimable columns.
    """
    x = np.asarray(x, dtype=np.float64)
    n, p = x.shape
    if p == 0:
        return (np.eye(n), np.zeros((n, 0)), np.zeros(0, dtype=int), 0)

    # Plain (non-pivoted) economy-mode QR on the original column order
    # so that redundant columns leave |R[i, i]| at zero in their
    # original slot. Scipy's economic QR returns the n x p R, which
    # is all we need for the rank check.
    _, r_probe = linalg.qr(x, mode="economic")
    diag_abs = np.abs(np.diag(r_probe))
    scale = diag_abs.max() if diag_abs.size else 0.0
    threshold = max(tol * scale, tol)
    estimable = diag_abs > threshold

    if estimable.all():
        pivot = np.arange(p, dtype=int)
        # Full-mode QR so q is n x n and q.T @ y includes the
        # (n - p) residual rows used by downstream sigma computation.
        q, r = linalg.qr(x, mode="full")
        return q, r, pivot, p

    est_idx = np.where(estimable)[0]
    nonest_idx = np.where(~estimable)[0]
    pivot = np.concatenate([est_idx, nonest_idx]).astype(int)
    rank = int(est_idx.size)
    # Redo QR on the reordered matrix so the top-left rank x rank
    # block of R is non-singular. Full mode ensures q is n x n.
    q, r = linalg.qr(x[:, pivot], mode="full")
    return q, r, pivot, rank


[docs] def is_fullrank(x: np.ndarray) -> bool: """ Check whether a matrix has full column rank. Parameters ---------- x : ndarray Matrix to check. Returns ------- bool True if x has full column rank. """ x = np.asarray(x) if x.ndim == 1: x = x.reshape(-1, 1) # eigvalsh returns eigenvalues in ascending order (smallest first) # R's eigen() returns them in descending order (largest first) # For full rank: largest eigenvalue > 0 and smallest/largest > 1e-13 eigvals = np.linalg.eigvalsh(x.T @ x) return eigvals[-1] > 0 and abs(eigvals[0] / eigvals[-1]) > 1e-13
[docs] def non_estimable( x: np.ndarray, coef_names: list[str] | None = None, ) -> list[str] | None: """ Check for non-estimable coefficients in a design matrix. Mirrors R's ``limma::nonEstimable``: runs a QR decomposition in original column order (``qr(x, LAPACK=FALSE)``), and if the design is rank-deficient returns the names of the redundant columns (the tail of ``qr$pivot``). When no column names are available, falls back to 1-based string indices to match R's ``colnames(x) <- as.character(1:p)`` default. Parameters ---------- x : ndarray, DataFrame, or patsy DesignMatrix Design matrix. Column names are extracted when present. coef_names : list of str, optional Explicit column names. Overrides any names carried on ``x``. Use this when the caller has already converted the design to an anonymous ndarray but still has the original names. Returns ------- list of str or None Names (or 1-based indices) of the non-estimable columns, or ``None`` when the design is full rank. """ # Extract column names if not supplied explicitly. if coef_names is None: if isinstance(x, pd.DataFrame): coef_names = [str(c) for c in x.columns] else: di = getattr(x, "design_info", None) if di is not None: cn = getattr(di, "column_names", None) if cn is not None: coef_names = [str(c) for c in cn] arr = np.asarray(x, dtype=np.float64) if arr.ndim == 1: arr = arr.reshape(-1, 1) p = arr.shape[1] q, r, pivot, rank = _qr_r_style(arr) if rank >= p: return None # R's nonEstimable: colnames(x) or as.character(1:p); then # replace any empty name with its 1-based index. if coef_names is None: names = [str(i + 1) for i in range(p)] else: names = [coef_names[i] if coef_names[i] else str(i + 1) for i in range(p)] return [names[pivot[i]] for i in range(rank, p)]
def lm_series( M: np.ndarray, design: np.ndarray, weights: np.ndarray | None = None, ) -> dict: """ Fit linear model for each gene to a series of arrays. Low-level function implementing OLS or WLS fitting. Parameters ---------- M : ndarray Expression matrix, shape (n_genes, n_samples). design : ndarray Design matrix, shape (n_samples, n_coefficients). weights : ndarray, optional Weights matrix, shape (n_genes, n_samples) or (n_samples,). Returns ------- dict coefficients : ndarray, shape (n_genes, n_coefs) stdev_unscaled : ndarray, shape (n_genes, n_coefs) sigma : ndarray, shape (n_genes,) df_residual : ndarray, shape (n_genes,) cov_coefficients : ndarray, shape (n_coefs, n_coefs) rank : int pivot : ndarray """ # R lmfit.R:97: `M <- as.matrix(M)` silently coerces DataFrame and # similar containers. Mirror that so internal callers (and the # rigorous tests) don't trip over a downstream pd-vs-np mismatch. M = np.asarray(M, dtype=np.float64) design = np.asarray(design, dtype=np.float64) n_genes, n_samples = M.shape n_coefs = design.shape[1] # Get coefficient names coef_names = [f"x{i}" for i in range(n_coefs)] # Normalise weight shape via R limma's asMatrixWeights logic; # as_matrix_weights always returns a fresh copy, so the # subsequent ``weights[weights <= 0] = np.nan`` is safe # (see known_diff_weights_mutation.md). # Track whether the input was array-weights-shaped (length-N # vector or (1, N) row matrix) - mirrors R's `arrayweights` # attribute (weights.R:74,83). lmfit.R:135 uses the attribute - # NOT the value pattern - to choose the fast path. Without this # flag, a (G, N) matrix with row-equal values would coincidentally # take pylimma's fast path while R takes the slow path, giving # ~19% drift in cov_coefficients (mirrors gls_series F1). is_array_weights = False if weights is not None: weights_in = np.asarray(weights) is_array_weights = (weights_in.ndim == 1 and weights_in.size == n_samples) or ( weights_in.ndim == 2 and weights_in.shape == (1, n_samples) ) weights = as_matrix_weights(weights, (n_genes, n_samples)) weights[weights <= 0] = np.nan M = M.copy() M[~np.isfinite(weights)] = np.nan # Check if we can do fast computation (no missing values, no probe-specific weights). has_missing = np.any(~np.isfinite(M)) has_probe_weights = weights is not None and not is_array_weights if not has_missing and not has_probe_weights: # Fast path: fit all genes at once return _lm_series_fast(M, design, weights, coef_names) else: # Slow path: iterate over genes return _lm_series_slow(M, design, weights, coef_names) def _lm_series_fast( M: np.ndarray, design: np.ndarray, weights: np.ndarray | None, coef_names: list[str], ) -> dict: """Fast path for lm_series when no missing values or probe weights.""" n_genes, n_samples = M.shape n_coefs = design.shape[1] if weights is None: # OLS q, r, pivot, rank = _qr_r_style(design) # Solve for coefficients: design @ coef = M.T # Using QR: coef = R^-1 @ Q.T @ M.T qty = q.T @ M.T # (n_coefs, n_genes) coefficients = linalg.solve_triangular(r[:rank, :rank], qty[:rank, :]) # Reorder coefficients according to pivot coef_full = np.full((n_coefs, n_genes), np.nan) coef_full[pivot[:rank], :] = coefficients coefficients = coef_full.T # (n_genes, n_coefs) # Residual standard deviation df_residual = n_samples - rank if df_residual > 0: # effects = Q.T @ M.T, residuals are effects[rank:] residual_effects = qty[rank:, :] sigma = np.sqrt(np.mean(residual_effects**2, axis=0)) else: sigma = np.full(n_genes, np.nan) # Covariance of coefficients r_inv = linalg.solve_triangular(r[:rank, :rank], np.eye(rank)) cov_coef_core = r_inv @ r_inv.T # Full covariance matrix with NaN for non-estimable cov_coefficients = np.full((n_coefs, n_coefs), np.nan) est_idx = pivot[:rank] cov_coefficients[np.ix_(est_idx, est_idx)] = cov_coef_core # Standard errors (unscaled by sigma) stdev_unscaled = np.full((n_genes, n_coefs), np.nan) stdev_unscaled[:, est_idx] = np.sqrt(np.diag(cov_coef_core)) else: # WLS with array weights (same weights for all genes) w = weights[0, :] sqrt_w = np.sqrt(w) # Weight the design and data design_w = design * sqrt_w[:, np.newaxis] M_w = M * sqrt_w q, r, pivot, rank = _qr_r_style(design_w) qty = q.T @ M_w.T coefficients = linalg.solve_triangular(r[:rank, :rank], qty[:rank, :]) coef_full = np.full((n_coefs, n_genes), np.nan) coef_full[pivot[:rank], :] = coefficients coefficients = coef_full.T df_residual = n_samples - rank if df_residual > 0: residual_effects = qty[rank:, :] sigma = np.sqrt(np.mean(residual_effects**2, axis=0)) else: sigma = np.full(n_genes, np.nan) r_inv = linalg.solve_triangular(r[:rank, :rank], np.eye(rank)) cov_coef_core = r_inv @ r_inv.T cov_coefficients = np.full((n_coefs, n_coefs), np.nan) est_idx = pivot[:rank] cov_coefficients[np.ix_(est_idx, est_idx)] = cov_coef_core stdev_unscaled = np.full((n_genes, n_coefs), np.nan) stdev_unscaled[:, est_idx] = np.sqrt(np.diag(cov_coef_core)) return { "coefficients": coefficients, "stdev_unscaled": stdev_unscaled, "sigma": sigma, "df_residual": np.full(n_genes, df_residual), "cov_coefficients": cov_coefficients, "pivot": pivot, "rank": rank, } def _lm_series_slow( M: np.ndarray, design: np.ndarray, weights: np.ndarray | None, coef_names: list[str], ) -> dict: """Slow path for lm_series with missing values or probe-specific weights.""" n_genes, n_samples = M.shape n_coefs = design.shape[1] coefficients = np.full((n_genes, n_coefs), np.nan) stdev_unscaled = np.full((n_genes, n_coefs), np.nan) sigma = np.full(n_genes, np.nan) df_residual = np.zeros(n_genes) for i in range(n_genes): y = M[i, :] obs = np.isfinite(y) if np.sum(obs) == 0: continue X = design[obs, :] y_obs = y[obs] if weights is None: # OLS q, r, pivot, rank = _qr_r_style(X) if rank == 0: continue qty = q.T @ y_obs coef = linalg.solve_triangular(r[:rank, :rank], qty[:rank]) coefficients[i, pivot[:rank]] = coef r_inv = linalg.solve_triangular(r[:rank, :rank], np.eye(rank)) stdev_unscaled[i, pivot[:rank]] = np.sqrt(np.diag(r_inv @ r_inv.T)) df_residual[i] = np.sum(obs) - rank if df_residual[i] > 0: resid_effects = qty[rank:] sigma[i] = np.sqrt(np.mean(resid_effects**2)) else: # WLS w = weights[i, obs] sqrt_w = np.sqrt(w) X_w = X * sqrt_w[:, np.newaxis] y_w = y_obs * sqrt_w q, r, pivot, rank = _qr_r_style(X_w) if rank == 0: continue qty = q.T @ y_w coef = linalg.solve_triangular(r[:rank, :rank], qty[:rank]) coefficients[i, pivot[:rank]] = coef r_inv = linalg.solve_triangular(r[:rank, :rank], np.eye(rank)) stdev_unscaled[i, pivot[:rank]] = np.sqrt(np.diag(r_inv @ r_inv.T)) df_residual[i] = np.sum(obs) - rank if df_residual[i] > 0: resid_effects = qty[rank:] sigma[i] = np.sqrt(np.mean(resid_effects**2)) # Compute covariance matrix from full design q, r, pivot, rank = _qr_r_style(design) r_inv = linalg.solve_triangular(r[:rank, :rank], np.eye(rank)) cov_coef_core = r_inv @ r_inv.T cov_coefficients = np.full((n_coefs, n_coefs), np.nan) est_idx = pivot[:rank] cov_coefficients[np.ix_(est_idx, est_idx)] = cov_coef_core return { "coefficients": coefficients, "stdev_unscaled": stdev_unscaled, "sigma": sigma, "df_residual": df_residual, "cov_coefficients": cov_coefficients, "pivot": pivot, "rank": rank, } def _huber_weights(u: np.ndarray, k: float = 1.345) -> np.ndarray: """Compute Huber weights: psi(u)/u = 1 if |u| <= k, else k/|u|.""" return np.where(np.abs(u) <= k, 1.0, k / np.abs(u)) def _bisquare_weights(u: np.ndarray, c: float = 4.685) -> np.ndarray: """Compute Tukey bisquare weights: (1 - (u/c)^2)^2 if |u| <= c, else 0.""" return np.where(np.abs(u) <= c, (1 - (u / c) ** 2) ** 2, 0.0)
[docs] def mrlm( M: np.ndarray, design: np.ndarray | None = None, ndups: int = 1, spacing: int = 1, weights: np.ndarray | None = None, method: str = "huber", maxit: int = 20, acc: float = 1e-4, k: float = 1.345, ) -> dict: """ Robustly fit linear model for each gene using M-estimation. Uses iteratively reweighted least squares with a robust loss function (Huber's T by default) to downweight outliers. Matches R's MASS::rlm. Parameters ---------- M : ndarray Expression matrix, shape (n_genes, n_samples). design : ndarray, optional Design matrix, shape (n_samples, n_coefficients). If None, uses intercept-only model. ndups : int, default 1 Number of within-array duplicate spots. spacing : int, default 1 Spacing between duplicate spots. weights : ndarray, optional Prior weights for observations. method : str, default "huber" Robust estimation method. Options: "huber", "bisquare". maxit : int, default 20 Maximum iterations for IRLS (matches R's default). acc : float, default 1e-4 Convergence tolerance (matches R's default). k : float, default 1.345 Tuning constant for Huber estimator (matches R's default). Returns ------- dict coefficients : ndarray, shape (n_genes, n_coefs) stdev_unscaled : ndarray, shape (n_genes, n_coefs) sigma : ndarray, shape (n_genes,) df_residual : ndarray, shape (n_genes,) cov_coefficients : ndarray, shape (n_coefs, n_coefs) Notes ----- This function implements R's MASS::rlm algorithm exactly: - MAD scale estimation: ``median(abs(resid)) / 0.6745`` - Huber weights: psi(u)/u where psi is the Huber function - Convergence on residuals: sqrt(sum((old - new)^2) / sum(old^2)) References ---------- Huber, P. J. (1981). Robust Statistics. Wiley. Venables, W. N. & Ripley, B. D. (2002). Modern Applied Statistics with S. """ M = np.asarray(M, dtype=np.float64) n_genes, n_samples = M.shape # Check design if design is None: design = np.ones((n_samples, 1)) design = np.asarray(design, dtype=np.float64) n_coefs = design.shape[1] # Normalise weight shape through asMatrixWeights and copy-guard # the subsequent NaN write (see lm_series header comment). if weights is not None: weights = as_matrix_weights(weights, (n_genes, n_samples)) weights[weights <= 0] = np.nan M = M.copy() M[~np.isfinite(weights)] = np.nan # Unwrap duplicates if needed if ndups > 1: M = unwrap_dups(M, ndups=ndups, spacing=spacing) design = np.kron(design, np.ones((ndups, 1))) if weights is not None: weights = unwrap_dups(weights, ndups=ndups, spacing=spacing) n_genes, n_samples = M.shape # Select weight function if method.lower() == "huber": def weight_fn(u): return _huber_weights(u, k) elif method.lower() == "bisquare": def weight_fn(u): return _bisquare_weights(u, c=4.685) else: raise ValueError(f"Unknown method: {method}. Use 'huber' or 'bisquare'") # Initialize output coefficients = np.full((n_genes, n_coefs), np.nan) stdev_unscaled = np.full((n_genes, n_coefs), np.nan) sigma = np.full(n_genes, np.nan) df_residual = np.zeros(n_genes) # Fit each gene using R's exact IRLS algorithm for i in range(n_genes): y = M[i, :] obs = np.isfinite(y) if np.sum(obs) <= n_coefs: continue y_obs = y[obs] X = design[obs, :] # Apply prior weights if provided if weights is not None: prior_w = weights[i, obs] sqrt_prior_w = np.sqrt(prior_w) y_work = y_obs * sqrt_prior_w X_work = X * sqrt_prior_w[:, np.newaxis] else: prior_w = None y_work = y_obs X_work = X # Check per-gene design rank. R's MASS::rlm errors with # "'x' is singular: singular fits are not implemented in 'rlm'" # for rank-deficient inputs; mirror that behaviour rather than # silently returning lstsq's minimum-norm solution. _, _, _, rank_x = _qr_r_style(X_work) if rank_x < n_coefs: raise ValueError("'x' is singular: singular fits are not implemented in 'rlm'") # OLS initial fit coef, _, rank_fit, _ = np.linalg.lstsq(X_work, y_work, rcond=None) resid = y_work - X_work @ coef # IRLS iterations (matching R's MASS::rlm) # R computes scale at START of each iteration and returns that value. # sqrt_w defaults to ones so that if the loop breaks on scale==0 # before any weighting (saturated design), the post-loop QR uses # the unweighted X - matching R's initial `lm.wfit(x, y, w=rep(1,n))`. scale = np.median(np.abs(resid)) / 0.6745 # Initial scale sqrt_w = np.ones(len(y_obs)) converged = False for _ in range(maxit): # MAD scale estimate (computed at start of iteration) scale = np.median(np.abs(resid)) / 0.6745 if scale == 0: converged = True # R: `done <- TRUE` in MASS::rlm break # Compute robust weights u = resid / scale w = weight_fn(u) # Weighted least squares sqrt_w = np.sqrt(w) Xw = X_work * sqrt_w[:, np.newaxis] yw = y_work * sqrt_w new_coef = np.linalg.lstsq(Xw, yw, rcond=None)[0] new_resid = y_work - X_work @ new_coef # Convergence on residuals (R's irls.delta) conv = np.sqrt(np.sum((resid - new_resid) ** 2) / max(1e-20, np.sum(resid**2))) coef = new_coef resid = new_resid if conv <= acc: converged = True break # MASS::rlm warns when IRLS hits maxit without converging; R's # mrlm forwards that warning per gene. if not converged: warnings.warn(f"'rlm' failed to converge in {maxit} steps") coefficients[i, :] = coef # Unscaled standard errors from QR decomposition of final weighted design # R's MASS::rlm uses the QR from final IRLS iteration (with robust weights) Xw_final = X_work * sqrt_w[:, np.newaxis] q, r, piv, rank = _qr_r_style(Xw_final) if rank > 0: r_inv = linalg.solve_triangular(r[:rank, :rank], np.eye(rank)) stdev_unscaled[i, piv[:rank]] = np.sqrt(np.diag(r_inv @ r_inv.T)) df_residual[i] = len(y_obs) - rank if df_residual[i] > 0: # Use the scale from the last iteration (R's behaviour) # R returns the scale computed at the START of the final iteration sigma[i] = scale # Compute covariance from full design q, r, pivot, rank = _qr_r_style(design) r_inv = linalg.solve_triangular(r[:rank, :rank], np.eye(rank)) cov_coef_core = r_inv @ r_inv.T cov_coefficients = np.full((n_coefs, n_coefs), np.nan) est_idx = pivot[:rank] cov_coefficients[np.ix_(est_idx, est_idx)] = cov_coef_core return { "coefficients": coefficients, "stdev_unscaled": stdev_unscaled, "sigma": sigma, "df_residual": df_residual, "cov_coefficients": cov_coefficients, "pivot": pivot, "rank": rank, }
[docs] def gls_series( M: np.ndarray, design: np.ndarray | None = None, ndups: int = 2, spacing: int = 1, block: np.ndarray | None = None, correlation: float | None = None, weights: np.ndarray | None = None, ) -> dict: """ Fit linear model for each gene using generalized least squares. Allows for correlation between samples, either through duplicate spots within arrays or through blocking factors. Parameters ---------- M : ndarray Expression matrix, shape (n_genes, n_samples). design : ndarray, optional Design matrix, shape (n_samples, n_coefficients). If None, uses intercept-only model. ndups : int, default 1 Number of within-array duplicate spots. spacing : int, default 1 Spacing between duplicate spots. block : array_like, optional Block indicator for correlated samples. If provided, ndups and spacing are ignored. correlation : float, optional Intra-block correlation. If None, will need to be estimated externally (e.g., via duplicate_correlation()). weights : ndarray, optional Observation weights. Returns ------- dict coefficients : ndarray, shape (n_genes, n_coefs) stdev_unscaled : ndarray, shape (n_genes, n_coefs) sigma : ndarray, shape (n_genes,) df_residual : ndarray, shape (n_genes,) cov_coefficients : ndarray, shape (n_coefs, n_coefs) correlation : float block : ndarray or None ndups : int spacing : int Notes ----- This function uses Cholesky decomposition to transform the GLS problem to an equivalent OLS problem. The correlation structure is either: - Within-array duplicates (ndups > 1): spots within the same array are correlated with the specified correlation. - Between-sample blocks (block != None): samples within the same block are correlated. References ---------- Smyth, G. K., Michaud, J. and Scott, H. S. (2005). Use of within-array replicate spots for assessing differential expression in microarray experiments. Bioinformatics, 21, 2067-2075. """ M = np.asarray(M, dtype=np.float64) n_genes, n_arrays = M.shape # Check design if design is None: design = np.ones((n_arrays, 1)) design = np.asarray(design, dtype=np.float64) if design.shape[0] != n_arrays: raise ValueError("Number of rows of design matrix does not match number of arrays") n_coefs = design.shape[1] # Check correlation - auto-estimate if not provided (R parity) if correlation is None: dc_result = duplicate_correlation( M, design=design, ndups=ndups, spacing=spacing, block=block, weights=weights ) correlation = dc_result["consensus_correlation"] if abs(correlation) >= 1: raise ValueError("correlation is 1 or -1, so the model is degenerate") # Normalise weight shape through asMatrixWeights; helper returns # a fresh copy so the subsequent NaN/zero writes are safe. # Track whether the input was array-weights-shaped (length-N vector # or (1, N) row matrix) - this mirrors R's `arrayweights` attribute # set inside asMatrixWeights (weights.R:74,83). The attribute is # stripped by unwrapdups (dups.R:14-16), and limma's fast/slow # dispatch (lmfit.R:301) reads the attribute - not the value pattern. is_array_weights = False if weights is not None: weights_in = np.asarray(weights) is_array_weights = (weights_in.ndim == 1 and weights_in.size == n_arrays) or ( weights_in.ndim == 2 and weights_in.shape == (1, n_arrays) ) weights = as_matrix_weights(weights, (n_genes, n_arrays)) weights[np.isnan(weights)] = 0 M = M.copy() M[weights < 1e-15] = np.nan weights[weights < 1e-15] = np.nan # Build correlation matrix if block is None: # Within-array duplicates if ndups < 2: warnings.warn("No duplicates (ndups < 2)") ndups = 1 correlation = 0 # Correlation matrix: arrays x arrays with duplicates # Each array has ndups spots that are correlated corr_block = np.full((ndups, ndups), correlation) np.fill_diagonal(corr_block, 1.0) cormatrix = linalg.block_diag(*[corr_block for _ in range(n_arrays)]) # Unwrap duplicates M = unwrap_dups(M, ndups=ndups, spacing=spacing) if weights is not None: weights = unwrap_dups(weights, ndups=ndups, spacing=spacing) is_array_weights = False # R's unwrapdups strips arrayweights attr # Expand design for duplicates design = np.kron(design, np.ones((ndups, 1))) n_genes, n_samples = M.shape else: # Correlated samples (blocking) ndups = 1 spacing = 1 block = np.asarray(block) if len(block) != n_arrays: raise ValueError("Length of block does not match number of arrays") # Build correlation matrix from block structure unique_blocks = np.unique(block) Z = np.array( [[block[i] == ub for ub in unique_blocks] for i in range(n_arrays)], dtype=float ) cormatrix = Z @ (correlation * Z.T) np.fill_diagonal(cormatrix, 1.0) n_samples = n_arrays # Initialize output stdev_unscaled = np.full((n_genes, n_coefs), np.nan) # Check if fast computation is possible. Mirror R lmfit.R:301: # `NoProbeWts <- all(is.finite(M)) && (is.null(weights) || arrayweights attr)`. has_missing = np.any(~np.isfinite(M)) has_probe_weights = weights is not None and not is_array_weights if not has_missing and not has_probe_weights: # Fast path: fit all genes at once V = cormatrix.copy() if weights is not None: # Array weights wrs = 1.0 / np.sqrt(weights[0, :]) V = wrs[:, np.newaxis] * V * wrs[np.newaxis, :] # Cholesky decomposition chol_V = linalg.cholesky(V, lower=False) # Transform data: y* = L^-T @ y y = linalg.solve_triangular(chol_V, M.T, trans="T") # (n_samples, n_genes) # Transform design: X* = L^-T @ X X = linalg.solve_triangular(chol_V, design, trans="T") # OLS on transformed data q, r, pivot, rank = _qr_r_style(X) # Coefficients qty = q.T @ y # (n_coefs, n_genes) coefficients = linalg.solve_triangular(r[:rank, :rank], qty[:rank, :]) coef_full = np.full((n_coefs, n_genes), np.nan) coef_full[pivot[:rank], :] = coefficients coefficients = coef_full.T # Residual standard deviation df_residual = n_samples - rank if df_residual > 0: sigma = np.sqrt(np.mean(qty[rank:, :] ** 2, axis=0)) else: sigma = np.full(n_genes, np.nan) # Covariance of coefficients r_inv = linalg.solve_triangular(r[:rank, :rank], np.eye(rank)) cov_coef_core = r_inv @ r_inv.T cov_coefficients = np.full((n_coefs, n_coefs), np.nan) est_idx = pivot[:rank] cov_coefficients[np.ix_(est_idx, est_idx)] = cov_coef_core stdev_unscaled[:, est_idx] = np.sqrt(np.diag(cov_coef_core)) return { "coefficients": coefficients, "stdev_unscaled": stdev_unscaled, "sigma": sigma, "df_residual": np.full(n_genes, df_residual), "cov_coefficients": cov_coefficients, "pivot": pivot, "rank": rank, "ndups": ndups, "spacing": spacing, "block": block, "correlation": correlation, } # Slow path: iterate over genes coefficients = np.full((n_genes, n_coefs), np.nan) sigma = np.full(n_genes, np.nan) df_residual = np.zeros(n_genes) for i in range(n_genes): y = M[i, :] obs = np.isfinite(y) if np.sum(obs) == 0: continue y_obs = y[obs] X = design[obs, :] V = cormatrix[np.ix_(obs, obs)] if weights is not None: wrs = 1.0 / np.sqrt(weights[i, obs]) V = wrs[:, np.newaxis] * V * wrs[np.newaxis, :] # Cholesky decomposition try: chol_V = linalg.cholesky(V, lower=False) except linalg.LinAlgError: # Matrix not positive definite continue # Transform y_t = linalg.solve_triangular(chol_V, y_obs, trans="T") if np.all(X == 0): n = len(y_obs) df_residual[i] = n sigma[i] = np.sqrt(np.mean(y_t**2)) else: X_t = linalg.solve_triangular(chol_V, X, trans="T") # OLS on transformed q, r, pivot, rank = _qr_r_style(X_t) if rank == 0: continue qty = q.T @ y_t coef = linalg.solve_triangular(r[:rank, :rank], qty[:rank]) coefficients[i, pivot[:rank]] = coef r_inv = linalg.solve_triangular(r[:rank, :rank], np.eye(rank)) stdev_unscaled[i, pivot[:rank]] = np.sqrt(np.diag(r_inv @ r_inv.T)) df_residual[i] = len(y_obs) - rank if df_residual[i] > 0: # Residual SS from QR decomposition: qty[rank:] are residual effects # This avoids dimension issues when rank < n_coefs sigma[i] = np.sqrt(np.sum(qty[rank:] ** 2) / df_residual[i]) # Compute covariance from full correlation matrix chol_V = linalg.cholesky(cormatrix, lower=False) X_t = linalg.solve_triangular(chol_V, design, trans="T") q, r, pivot, rank = _qr_r_style(X_t) r_inv = linalg.solve_triangular(r[:rank, :rank], np.eye(rank)) cov_coef_core = r_inv @ r_inv.T cov_coefficients = np.full((n_coefs, n_coefs), np.nan) est_idx = pivot[:rank] cov_coefficients[np.ix_(est_idx, est_idx)] = cov_coef_core return { "coefficients": coefficients, "stdev_unscaled": stdev_unscaled, "sigma": sigma, "df_residual": df_residual, "cov_coefficients": cov_coefficients, "pivot": pivot, "rank": rank, "ndups": ndups, "spacing": spacing, "block": block, "correlation": correlation, }
def _parse_design( design, data: pd.DataFrame | None = None, n_samples: int | None = None, ) -> tuple[np.ndarray, list[str] | None]: """ Parse design matrix from various input types. Parameters ---------- design : array_like or str or None Design matrix as numpy array, DataFrame, patsy ``DesignMatrix``, or formula string like ``"~ group + batch"``. data : DataFrame, optional Sample metadata for formula parsing. n_samples : int, optional Number of samples (for default intercept-only design). Returns ------- design : ndarray Design matrix. names : list of str or None Column names if the input carried them (DataFrame columns, patsy ``design_info.column_names``, or formula output). None otherwise; callers should fall back to 1-based string indices matching R's ``colnames(x) <- as.character(1:p)``. """ names: list[str] | None = None if design is None: if n_samples is None: raise ValueError("Must provide design or n_samples") return np.ones((n_samples, 1)), None if isinstance(design, str): # Formula string - use patsy try: import patsy except ImportError: raise ImportError( "patsy is required for formula-based design matrices. " "Install with: pip install patsy" ) if data is None: raise ValueError("data must be provided when design is a formula string") dm = patsy.dmatrix(design, data) try: names = [str(c) for c in dm.design_info.column_names] except AttributeError: names = None return np.asarray(dm), names if isinstance(design, pd.DataFrame): names = [str(c) for c in design.columns] else: di = getattr(design, "design_info", None) if di is not None: cn = getattr(di, "column_names", None) if cn is not None: names = [str(c) for c in cn] arr = np.asarray(design, dtype=np.float64) if arr.ndim == 1: arr = arr.reshape(-1, 1) return arr, names
[docs] def lm_fit( data, design=None, ndups: int | None = None, spacing: int | None = None, block: np.ndarray | None = None, correlation: float | None = None, weights: np.ndarray | None = None, method: str = "ls", key: str = "pylimma", layer: str | None = None, weights_layer: str | None = None, **mrlm_kwargs, ) -> dict | None: """ Fit linear models to expression data. This is the main entry point for fitting gene-wise linear models. Accepts either an AnnData object or a numpy array/DataFrame. Parameters ---------- data : AnnData, ndarray, or DataFrame Expression data. If AnnData, reads from adata.X (samples x genes) or specified layer, and stores results in adata.uns[key]. If ndarray or DataFrame, expects (n_genes, n_samples) and returns results as a dict. **Important:** Expression values must be normalised and log-transformed before calling this function. design : ndarray or str, optional Design matrix (n_samples, n_coefficients) or formula string like "~ group + batch". If None, uses intercept-only model. ndups : int, default 1 Number of within-array duplicate spots. spacing : int, default 1 Spacing between duplicate spots in the expression matrix. block : array_like, optional Block indicator for correlated samples. When provided, samples within the same block are assumed to be correlated. correlation : float, optional Intra-block or intra-duplicate correlation. Required when ndups > 1 or block is provided. Use duplicate_correlation() to estimate this value. weights : ndarray, optional Observation weights. Can be: - 1D array of length n_samples (array weights) - 2D array of shape (n_genes, n_samples) (gene-specific weights) method : str, default "ls" Fitting method. Options: - "ls": least squares (default) - "robust": robust regression using M-estimation key : str, default "pylimma" Key for storing results in adata.uns (AnnData input only). layer : str, optional Layer to use for expression data (AnnData input only). If None, uses adata.X. weights_layer : str, optional AnnData-only. Layer to read as observation weights. When ``None`` (default) and ``layer`` ends in ``"_E"``, the companion layer ``{layer[:-2]}_weights`` is auto-loaded if present (voom / vooma convention). Set this when ``voom``/``vooma`` were called with a non-default ``weights_layer=`` so the read side matches. Returns ------- dict or None If input is ndarray/DataFrame, returns dict with fit results. If input is AnnData, stores results in adata.uns[key] and returns None. Notes ----- The function dispatches to different fitting algorithms based on parameters: - If method="robust", uses mrlm() for robust M-estimation - If ndups < 2 and block is None, uses lm_series() for simple OLS/WLS - If ndups >= 2 or block is provided, uses gls_series() for GLS The fit results include: - coefficients: estimated coefficients (n_genes, n_coefs) - stdev_unscaled: unscaled standard errors (n_genes, n_coefs) - sigma: residual standard deviation (n_genes,) - df_residual: residual degrees of freedom (n_genes,) - cov_coefficients: coefficient covariance matrix (n_coefs, n_coefs) - Amean: mean expression per gene (n_genes,) - design: the design matrix used References ---------- Smyth, G. K. (2004). Linear models and empirical Bayes methods for assessing differential expression in microarray experiments. Statistical Applications in Genetics and Molecular Biology, 3(1), Article 3. """ # Polymorphic input dispatch is_anndata = _is_anndata(data) adata = data if is_anndata else None eawp = get_eawp(data, layer=layer, weights_layer=weights_layer) expr = eawp["exprs"] # Pick up weights from the input object if not passed explicitly. # For EList this surfaces ``obj["weights"]``; for AnnData it surfaces # a companion ``{stem}_weights`` layer auto-loaded by get_eawp. if weights is None and eawp.get("weights") is not None: weights = np.asarray(eawp["weights"], dtype=np.float64) # Resolve ndups/spacing from printer metadata when not explicitly # supplied, matching R's lmFit (lmfit.R:47-50): # if(is.null(ndups)) ndups <- y$printer$ndups # if(is.null(spacing)) spacing <- y$printer$spacing def _printer_attr(obj, key, default=1): printer = None if hasattr(obj, "printer"): printer = getattr(obj, "printer") elif isinstance(obj, dict): printer = obj.get("printer") if isinstance(printer, dict): value = printer.get(key) if value is not None: return int(value) return default if ndups is None: ndups = _printer_attr(data, "ndups", 1) if spacing is None: spacing = _printer_attr(data, "spacing", 1) # Extract metadata needed downstream (gene names and sample data). get_eawp # gives us probes/targets as DataFrames but lm_fit historically extracted # gene_names from DataFrame.index and sample_data from adata.obs - preserve # both conventions here. if is_anndata: sample_data = adata.obs gene_names = list(adata.var_names) if adata.var_names is not None else None elif isinstance(data, pd.DataFrame): sample_data = None gene_names = list(data.index) else: sample_data = None gene_names = None if expr.ndim != 2: raise ValueError("Expression data must be 2-dimensional") n_genes, n_samples = expr.shape # R lmfit.R:31: `if(!nrow(y$exprs)) stop("expression matrix has zero rows")`. if n_genes == 0: raise ValueError("expression matrix has zero rows") # Fall back to the design carried on the input object before # defaulting to an intercept, matching R's lmFit one-liner: # if(is.null(design)) design <- y$design # EList populates eawp["design"] via _eawp_from_elist_like; the # AnnData branch of get_eawp populates it from # adata.uns[<stem>]["design"] when called with layer="<stem>_E". if design is None and eawp.get("design") is not None: design = eawp["design"] # Parse design matrix. Column names (if any) flow through to # non_estimable so the "Coefficients not estimable" warning # identifies columns by name rather than by index when possible. design, design_names = _parse_design(design, data=sample_data, n_samples=n_samples) if design.shape[0] != n_samples: raise ValueError( f"Design matrix has {design.shape[0]} rows but expression data has {n_samples} samples" ) # Check for non-estimable coefficients ne = non_estimable(design, coef_names=design_names) if ne is not None: warnings.warn(f"Coefficients not estimable: {', '.join(ne)}") # Validate method (matches R's match.arg) if method not in ("ls", "robust"): raise ValueError(f"method '{method}' not recognized. Must be 'ls' or 'robust'.") # Validate correlation requirement. mrlm (robust) does not use # correlation - duplicates are unwrapped and fit per-gene via M-estimation # - so correlation is only required for the GLS (ls) path. if method == "ls" and (ndups >= 2 or block is not None) and correlation is None: raise ValueError( "correlation must be provided when ndups >= 2 or block is specified. " "Use duplicate_correlation() to estimate it." ) # Dispatch to appropriate fitting function if method == "robust": # R warns when robust is combined with blocking or within-array # duplicates (correlation cannot be combined with robust regression) # but still calls mrlm. Match that behaviour. if block is not None or ndups > 1: warnings.warn( "Correlation cannot be combined with robust regression. " "If you wish to use blocking or duplicate correlation, " "then use least squares regression." ) fit = mrlm( expr, design, ndups=ndups, spacing=spacing, weights=weights, **mrlm_kwargs, ) elif ndups < 2 and block is None: # Simple OLS or WLS fit = lm_series(expr, design, weights=weights) else: # GLS for correlated samples (duplicates or blocking) fit = gls_series( expr, design, ndups=ndups, spacing=spacing, block=block, correlation=correlation, weights=weights, ) # R lmfit.R:77-81: warn when some genes have a mix of NA and non-NA # coefficients (i.e. partial estimability per-gene). coefs_arr = np.asarray(fit["coefficients"]) if coefs_arr.ndim == 2 and coefs_arr.shape[1] > 1: per_gene_nas = np.sum(np.isnan(coefs_arr), axis=1) n_partial = int(np.sum((per_gene_nas > 0) & (per_gene_nas < coefs_arr.shape[1]))) if n_partial > 0: warnings.warn(f"Partial NA coefficients for {n_partial} probe(s)") # Promote to MArrayLM and add post-fit slots. # R lmfit.R:55-62: `y$Amean <- rowMeans(y$exprs, na.rm=TRUE)`, then # if ndups>1 reshape via `unwrapdups` and `rowMeans` again so Amean # has one entry per fitted gene (matching coefficients row count). fit = MArrayLM(fit) amean = np.nanmean(expr, axis=1) if ndups > 1: amean = np.nanmean( unwrap_dups(amean.reshape(-1, 1), ndups=ndups, spacing=spacing), axis=1, ) fit["Amean"] = amean fit["design"] = design # R's lmFit does fit$genes <- y$probes (lmfit.R:84). pylimma's # AnnData/DataFrame branches pre-compute gene_names from var_names # / index respectively and those callers expect a bare list here. # For EList / ndarray input we have no such pre-computation, so # fall back to the probes DataFrame captured by get_eawp's EList # branch (classes.py:748-758 populates y["probes"] from # EList["genes"]). Without this, EList callers get fit["genes"] = # None and lose their gene annotations through the pipeline. if gene_names is not None: fit["genes"] = gene_names else: fit["genes"] = eawp.get("probes") # Coefficient names from the design matrix. R's lm.series stores them # as colnames(fit$coefficients) (lmfit.R:125); pylimma stores # coefficients as a bare ndarray so we keep the names in a sidecar # slot. topTable's "Removing intercept" logic and contrasts_fit's # string-coefficient lookup both read fit["coef_names"]. Skip when # design was a bare ndarray with no names (matches R, where # colnames(design) is NULL in that case) and avoids writing a None # value into adata.uns[key] which anndata's h5ad writer cannot # serialise cleanly. if design_names is not None: fit["coef_names"] = design_names # R's lmFit records the fitting method (lmfit.R:86); plotExons and # other diagnostics rely on it. fit["method"] = method # Propagate sample metadata (y$targets on an EList, adata.obs on an # AnnData). R's lmFit does the same via fit$targets <- y$targets. # Downstream diagnostic plots (plotSA / plotMDS) expect it. if eawp.get("targets") is not None: fit["targets"] = eawp["targets"] if is_anndata: # Store as plain dict so adata.write_h5ad() works. anndata's # IO registry dispatches on exact type, not isinstance, so the # MArrayLM subclass trips `IORegistryError`. Users can rewrap # via pylimma.MArrayLM(adata.uns[key]) if they need the # class's method API back. adata.uns[key] = dict(fit) return None else: return fit