Source code for pylimma.dups

# SPDX-License-Identifier: GPL-3.0-or-later
#
# This module is a Python port of code from R limma. Original R copyrights:
#   dups.R (unwrapdups, avedups, avereps, duplicateCorrelation)
#                              Copyright (C) 2002-2021 Gordon Smyth
#
# duplicate_correlation() additionally ports two routines from the R statmod
# package (which limma's duplicateCorrelation calls per gene):
#   statmod::mixedModel2Fit    Copyright (C) Gordon Smyth, Lizhong Chen;
#                              GPL-2 | GPL-3
#   statmod::glmgam.fit        Copyright (C) Gordon Smyth, Lizhong Chen;
#                              GPL-2 | GPL-3
# Python port: Copyright (C) 2026 John Mulvey
"""
Functions to handle duplicate spots or blocking.

Provides utilities for working with within-array replicate spots.
"""

from __future__ import annotations

import warnings

import numpy as np
import pandas as pd
from scipy import linalg

from .classes import EList, _is_anndata, get_eawp


def _mixed_model_2_fit(
    y: np.ndarray,
    X: np.ndarray,
    Z: np.ndarray,
    w: np.ndarray | None = None,
    tol: float = 1e-6,
    maxit: int = 50,
) -> tuple[float, float] | None:
    """
    Fit mixed linear model and return variance components.

    This is a Python port of statmod::mixedModel2Fit from R.
    Uses moment-based REML estimation with gamma GLM refinement.

    Parameters
    ----------
    y : ndarray
        Response vector, shape (n,).
    X : ndarray
        Fixed effects design matrix, shape (n, p).
    Z : ndarray
        Random effects design matrix, shape (n, q).
    w : ndarray, optional
        Observation weights, shape (n,).
    tol : float, default 1e-6
        Convergence tolerance for gamma GLM.
    maxit : int, default 50
        Maximum iterations for gamma GLM.

    Returns
    -------
    tuple or None
        (residual_variance, block_variance) if successful, None otherwise.
    """
    # Apply weights if provided
    # NOTE: R's statmod::mixedModel2Fit applies sqrt(w) to y and X only, NOT to Z
    if w is not None:
        sw = np.sqrt(w)
        y = sw * y
        X = sw[:, np.newaxis] * X if X.ndim == 2 else sw * X

    X = np.atleast_2d(X)
    Z = np.atleast_2d(Z)
    mx = X.shape[0]

    # QR decomposition of X
    Q, R, pivot = linalg.qr(X, pivoting=True)
    r = np.sum(np.abs(np.diag(R)) > 1e-10)  # rank

    if r == 0:
        return None

    mq = mx - r
    if mq == 0:
        return None

    # Project Z and y onto residual space (orthogonal to X)
    # QtZ = Q2' @ Z where Q2 is the null space of X
    Q2 = Q[:, r:]  # null space basis
    QtZ = Q2.T @ Z  # shape (mq, nz)
    Qty = Q2.T @ y  # shape (mq,)

    # SVD of QtZ with full U (matches R's La.svd(QtZ, nu=mq, nv=0)).
    # full_matrices=True returns U of shape (mq, mq) with a deterministic
    # null-space extension - using full_matrices=False and filling with
    # random vectors perturbs uqy for positions where d=0 and biases the
    # variance-components regression.
    try:
        U, s, Vh = linalg.svd(QtZ, full_matrices=True)
    except (np.linalg.LinAlgError, linalg.LinAlgError, ValueError):
        return None

    # Transform y to diagonal space
    uqy = U.T @ Qty  # shape (mq,)

    # Set up regression: dy ~ dx where dy = uqy^2, dx = [1, d^2]
    d = np.zeros(mq)
    d[: len(s)] = s**2

    dx = np.column_stack([np.ones(mq), d])
    dy = uqy**2

    # Initial OLS fit
    try:
        dfit_coef, _, _, _ = linalg.lstsq(dx, dy)
    except (np.linalg.LinAlgError, linalg.LinAlgError, ValueError):
        return None

    varcomp = dfit_coef
    dfitted = dx @ varcomp

    # Refine with gamma GLM if conditions met
    # if (mq > 2 && sum(abs(d) > 1e-15) > 1 && var(d) > 1e-15)
    if mq > 2 and np.sum(np.abs(d) > 1e-15) > 1 and np.var(d) > 1e-15:
        # Starting values
        if np.all(dfitted >= 0):
            start = varcomp.copy()
        else:
            start = np.array([np.mean(dy), 0.0])

        # Gamma GLM with log link using IRLS
        varcomp = _glmgam_fit(dx, dy, start, tol=tol, maxit=maxit)
        if varcomp is None:
            varcomp = dfit_coef

    # Return variance components: [residual, block]
    # Note: negative block variance is allowed (will give negative ICC)
    # The bounding to valid correlation range happens in duplicate_correlation
    return (float(varcomp[0]), float(varcomp[1]))


def _deviance_gamma(y: np.ndarray, mu: np.ndarray) -> float:
    """Gamma deviance matching statmod::glmgam.fit's internal deviance.gamma."""
    if np.any(mu < 0):
        return np.inf
    o = (y < 1e-15) & (mu < 1e-15)
    if np.all(o):
        return 0.0
    if np.any(o):
        y1 = y[~o]
        mu1 = mu[~o]
        return 2.0 * np.sum((y1 - mu1) / mu1 - np.log(y1 / mu1))
    return 2.0 * np.sum((y - mu) / mu - np.log(y / mu))


def _glmgam_fit(
    X: np.ndarray,
    y: np.ndarray,
    start: np.ndarray,
    tol: float = 1e-6,
    maxit: int = 50,
) -> np.ndarray | None:
    """
    Fit gamma GLM with identity link via damped Fisher scoring.

    Faithful port of statmod::glmgam.fit: Levenberg-Marquardt damping on the
    Fisher information with a deviance-based line search. The simpler
    unweighted-IRLS approach drifts by ~1e-5 per fit compared to R because it
    skips the damping/line-search step.

    Parameters
    ----------
    X : ndarray
        Design matrix, shape (n, p).
    y : ndarray
        Non-negative response vector, shape (n,).
    start : ndarray
        Starting coefficients, shape (p,).
    tol : float
        Convergence tolerance on inner product of score and step.
    maxit : int
        Maximum outer iterations.
    """
    X = np.atleast_2d(X)
    n, p = X.shape
    y = np.asarray(y, dtype=np.float64)

    if np.any(y < 0):
        raise ValueError("y must be non-negative")
    maxy = np.max(y) if y.size else 0.0
    if maxy == 0.0:
        return np.zeros(p)

    beta = np.asarray(start, dtype=np.float64).copy()
    mu = X @ beta
    if np.any(mu < 0):
        return None

    dev = _deviance_gamma(y, mu)

    I_p = np.eye(p)
    lambda_ = 0.0  # set on first iteration

    for iteration in range(1, maxit + 1):
        v = mu**2
        v = np.maximum(v, np.max(v) / 1e3)
        XVX = X.T @ (X / v[:, np.newaxis])
        maxinfo = np.max(np.diag(XVX))

        if iteration == 1:
            lambda_ = abs(np.mean(np.diag(XVX))) / p

        dl = X.T @ ((y - mu) / v)
        beta_old = beta
        dev_old = dev

        # Damped step with line search on deviance
        lev = 0
        dbeta = np.zeros(p)
        while True:
            lev += 1
            try:
                L = linalg.cholesky(XVX + lambda_ * I_p, lower=False)
            except linalg.LinAlgError:
                return None
            # Solve (R^T R) dbeta = dl via two triangular solves
            tmp = linalg.solve_triangular(L.T, dl, lower=True)
            dbeta = linalg.solve_triangular(L, tmp, lower=False)
            beta = beta_old + dbeta
            mu = X @ beta
            dev = _deviance_gamma(y, mu)
            if dev <= dev_old or (np.max(mu) > 0 and dev / np.max(mu) < 1e-15):
                break
            if lambda_ / maxinfo > 1e15:
                beta = beta_old
                break
            lambda_ *= 2.0

        if lambda_ / maxinfo > 1e15:
            break
        if lev == 1:
            lambda_ /= 10.0
        if dl @ dbeta < tol or (np.max(mu) > 0 and dev / np.max(mu) < 1e-15):
            break
        if iteration >= maxit:
            break

    return beta


def unwrap_dups(
    M: np.ndarray,
    ndups: int = 2,
    spacing: int = 1,
) -> np.ndarray:
    """
    Unwrap duplicate spots to long format.

    Reshapes expression matrix so that all spots for a given gene are in
    one row, with duplicate measurements in separate columns.

    Parameters
    ----------
    M : ndarray
        Expression matrix, shape (n_spots, n_arrays).
    ndups : int, default 2
        Number of duplicates.
    spacing : int, default 1
        Spacing between duplicates in the original matrix.

    Returns
    -------
    ndarray
        Unwrapped matrix, shape (n_genes, n_arrays * ndups).

    Notes
    -----
    This function matches R limma's unwrapdups behaviour. The output has
    columns ordered as: [dup0-array0, dup1-array0, ..., dup0-array1, ...].
    """
    if ndups == 1:
        return M

    M = np.asarray(M, dtype=np.float64)
    n_spots, n_arrays = M.shape
    n_groups = n_spots // ndups // spacing

    # Use Fortran order to match R's reshape behaviour
    # R: dim(M) <- c(spacing, ndups, ngroups, nslides)
    M = M.reshape((spacing, ndups, n_groups, n_arrays), order="F")

    # R: aperm(M, c(1,3,2,4))
    M = np.transpose(M, (0, 2, 1, 3))

    # R: dim(M) <- c(spacing*ngroups, ndups*nslides)
    M = M.reshape((spacing * n_groups, ndups * n_arrays), order="F")

    return M


def unique_genelist(
    genelist: np.ndarray | pd.DataFrame | list,
    ndups: int = 2,
    spacing: int = 1,
) -> np.ndarray | pd.DataFrame | list:
    """
    Extract unique gene identifiers from duplicate spots.

    Eliminates entries in genelist that correspond to duplicate spots,
    keeping only the first entry for each gene.

    Parameters
    ----------
    genelist : array_like, DataFrame, or list
        Gene identifiers or annotation.
    ndups : int, default 2
        Number of duplicates.
    spacing : int, default 1
        Spacing between duplicates.

    Returns
    -------
    array_like, DataFrame, or list
        Gene list with duplicates removed.
    """
    if ndups <= 1:
        return genelist

    # Get indices of first duplicate
    n = len(genelist) if hasattr(genelist, "__len__") else genelist.shape[0]
    indices = np.arange(n)
    unwrapped = unwrap_dups(indices.reshape(-1, 1), ndups=ndups, spacing=spacing)
    keep_idx = unwrapped[:, 0].astype(int)

    if isinstance(genelist, pd.DataFrame):
        return genelist.iloc[keep_idx].reset_index(drop=True)
    elif isinstance(genelist, (list, tuple)):
        return [genelist[i] for i in keep_idx]
    else:
        genelist = np.asarray(genelist)
        if genelist.ndim == 1:
            return genelist[keep_idx]
        else:
            return genelist[keep_idx, :]


[docs] def ave_dups( x: np.ndarray, ndups: int = 2, spacing: int = 1, weights: np.ndarray | None = None, ) -> np.ndarray: """ Average over duplicate spots. Computes the (weighted) mean across replicate spots for each gene. Parameters ---------- x : ndarray Expression matrix, shape (n_spots, n_arrays). ndups : int, default 2 Number of duplicates. spacing : int, default 1 Spacing between duplicates. weights : ndarray, optional Weights matrix with same shape as x. Returns ------- ndarray Averaged matrix, shape (n_genes, n_arrays). """ if ndups == 1: return x x = np.asarray(x, dtype=np.float64) n_spots, n_arrays = x.shape n_groups = n_spots // ndups // spacing # Use Fortran order to match R's reshape behaviour # R: dim(x) <- c(spacing, ndups, ngroups*nslides) x = x.reshape((spacing, ndups, n_groups * n_arrays), order="F") # R: aperm(x, c(2,1,3)) x = np.transpose(x, (1, 0, 2)) if weights is None: # Simple mean along dups axis (axis 0) result = np.nanmean(x, axis=0) else: # Weighted mean weights = np.asarray(weights, dtype=np.float64) weights = weights.reshape((spacing, ndups, n_groups * n_arrays), order="F") weights = np.transpose(weights, (1, 0, 2)) # Handle NaN and negative weights weights = weights.copy() weights[np.isnan(weights) | np.isnan(x)] = 0 weights[weights < 0] = 0 with np.errstate(invalid="ignore", divide="ignore"): result = np.nansum(weights * x, axis=0) / np.sum(weights, axis=0) # Reshape back: R: dim(x) <- c(spacing*ngroups, nslides) result = result.reshape((spacing * n_groups, n_arrays), order="F") return result
[docs] def avereps( x, ID: np.ndarray | list | None = None, ) -> np.ndarray | pd.DataFrame: """ Average over irregular replicate probes. Computes the mean across replicate probes identified by ID, mirroring R's ``limma::avereps``. When ``x`` carries row labels (pandas DataFrame index or Series ``.name``) and ``ID`` is not supplied, the row labels are used as probe IDs - the same default as R's ``avereps.default(x, ID=rownames(x))``. Parameters ---------- x : ndarray or DataFrame Expression matrix, shape (n_probes, n_arrays). ID : array_like, optional Probe identifiers. Probes with the same ID are averaged. If None and ``x`` is a DataFrame, ``x.index`` is used. If no source of IDs is available, raises ``ValueError`` matching R's ``"No probe IDs"`` error. Returns ------- ndarray or DataFrame Matrix of averaged rows, one per unique ID in order of first appearance. Returned as a DataFrame (indexed by the unique IDs, columns preserved) when ``x`` was a DataFrame, otherwise as an ndarray. To also recover the unique ID vector from an ndarray return value, read it off with ``np.unique(ID, return_index=True)`` or pass ``x`` as a DataFrame. Examples -------- R parity (matrix-return): >>> x = np.array([[1, 2], [3, 4], [5, 6]]) >>> ID = ["A", "A", "B"] >>> avereps(x, ID) array([[2., 3.], [5., 6.]]) DataFrame-in, DataFrame-out (ID defaults to index): >>> df = pd.DataFrame([[1, 2], [3, 4], [5, 6]], index=["A", "A", "B"]) >>> avereps(df) For ``AnnData`` input, ``ID`` defaults to ``adata.var_names`` and a new ``AnnData`` is returned with the var axis collapsed to the unique ids. As with :func:`aver_arrays`, the sample-count vs gene-count shape change means in-place mutation via a layer is not possible, so AnnData-in returns a value. """ if _is_anndata(x): return _avereps_anndata(x, ID=ID) # EList dispatch (mirrors R's avereps.EList at dups.R:267-285). # Averages E and weights, deduplicates genes, drops printer. Default # ID is y$genes$ID, then rownames (we use the genes DataFrame index), # matching R's precedence. if isinstance(x, EList): return _avereps_elist(x, ID=ID) is_df = isinstance(x, pd.DataFrame) columns = x.columns if is_df else None if ID is None: if is_df: ID = np.asarray(x.index) else: raise ValueError( "No probe IDs: pass ID=... or supply x as a DataFrame " "whose index holds the probe identifiers. Matches R's " "'No probe IDs' stop() when both ID and rownames(x) are NULL." ) arr = np.asarray(x, dtype=np.float64) ID = np.asarray(ID) # Unique IDs in order of first appearance (matches R's # factor(ID, levels=unique(ID)) combined with rowsum(reorder=FALSE)). _, idx = np.unique(ID, return_index=True) unique_ids = ID[np.sort(idx)] result = np.zeros((len(unique_ids), arr.shape[1])) for i, uid in enumerate(unique_ids): mask = ID == uid if mask.sum() == 1: result[i, :] = arr[mask, :][0] else: result[i, :] = np.nanmean(arr[mask, :], axis=0) if is_df: return pd.DataFrame(result, index=unique_ids, columns=columns) return result
def _avereps_elist(el, ID=None): """EList dispatch for avereps (R: avereps.EList at dups.R:267-285). Averages ``E`` and ``weights`` across replicate probes, deduplicates ``genes``, and drops ``printer``. Default ``ID`` is ``y$genes$ID`` if present, otherwise the genes DataFrame's index, otherwise R's error. Returns a new EList with the probe axis collapsed. """ E = np.asarray(el["E"], dtype=np.float64) weights = el.get("weights") genes = el.get("genes") if ID is None: if isinstance(genes, pd.DataFrame) and "ID" in genes.columns: ID = genes["ID"].to_numpy() elif isinstance(genes, pd.DataFrame): ID = np.asarray(genes.index) else: raise ValueError( "No probe IDs: pass ID=... or populate EList['genes'] " "with an 'ID' column or a meaningful index." ) ID = np.asarray(ID) if ID.shape[0] != E.shape[0]: raise ValueError(f"length of ID ({ID.shape[0]}) must match number of probes ({E.shape[0]})") # Average E (delegate to the matrix path for the numerics). E_new = avereps(E, ID=ID) # Average weights if present (matches R's avereps.EList line 274). weights_new = None if weights is not None: w_arr = np.asarray(weights, dtype=np.float64) if w_arr.shape == E.shape: weights_new = avereps(w_arr, ID=ID) # Deduplicate genes (matches R's avereps.EList line 275-278). _, first_idx = np.unique(ID, return_index=True) keep = np.sort(first_idx) genes_new = None if isinstance(genes, pd.DataFrame): genes_new = genes.iloc[keep].reset_index(drop=True) # Build the new EList - copy original, overwrite collapsed slots, # drop 'printer' (R's avereps.EList line 280). out_dict = {k: v for k, v in el.items() if k != "printer"} out_dict["E"] = E_new if weights_new is not None: out_dict["weights"] = weights_new if genes_new is not None: out_dict["genes"] = genes_new return EList(out_dict) def _avereps_anndata(adata, ID=None): """AnnData dispatch for avereps. Averages over duplicate probes (var rows) identified by ``ID`` (defaulting to ``adata.var_names``) and returns a new AnnData with the var axis collapsed to the unique ids in order of first appearance. Sample (obs) axis is preserved. """ try: import anndata as ad except ImportError as exc: raise RuntimeError("anndata is required for avereps(AnnData) but is not installed") from exc # get_eawp densifies and transposes to limma's (n_probes, n_samples). eawp = get_eawp(adata) E = np.asarray(eawp["exprs"], dtype=np.float64) if ID is None: ID = np.asarray(adata.var_names) ID = np.asarray(ID) if ID.shape[0] != E.shape[0]: raise ValueError(f"length of ID ({ID.shape[0]}) must match number of probes ({E.shape[0]})") # Delegate the averaging to the ndarray path; output is # (n_unique_probes, n_samples). averaged = avereps(E, ID=ID) # First-occurrence ordering of the unique ids, for picking # representative var rows in the collapsed AnnData. _, first_idx = np.unique(ID, return_index=True) keep_order = np.sort(first_idx) new_var_names = np.asarray(ID)[keep_order] if adata.var is not None and len(adata.var.columns): var_new = adata.var.iloc[keep_order].copy() else: var_new = pd.DataFrame(index=pd.Index(new_var_names)) var_new.index = pd.Index(new_var_names) obs_new = adata.obs.copy() if adata.obs is not None else None # averaged is (n_unique_probes, n_samples); AnnData wants # (n_samples, n_vars). return ad.AnnData(X=np.asarray(averaged).T, obs=obs_new, var=var_new)
[docs] def duplicate_correlation( M, design: np.ndarray | None = None, ndups: int = 2, spacing: int | str = 1, block: np.ndarray | None = None, trim: float = 0.15, weights: np.ndarray | None = None, *, layer: str | None = None, weights_layer: str | None = None, ) -> dict: """ Estimate correlation between duplicate spots or blocked samples. Estimates the intra-block correlation using a mixed linear model, computed separately for each gene and then averaged using Fisher's z-transformation. Parameters ---------- M : ndarray Expression matrix, shape (n_genes, n_samples) or (n_spots, n_arrays). design : ndarray, optional Design matrix. If None, uses intercept-only model. ndups : int, default 2 Number of within-array duplicate spots. spacing : int or str, default 1 Spacing between duplicates. Can be an integer or one of: - "columns": spacing of 1 (duplicates in adjacent rows) - "topbottom": spacing of n_spots/2 (duplicates in top/bottom halves) block : array_like, optional Block indicator for correlated samples. If provided, ndups and spacing are ignored. trim : float, default 0.15 Trimmed mean proportion for consensus correlation. weights : ndarray, optional Observation weights. Returns ------- dict consensus_correlation : float Consensus correlation (trimmed mean on Fisher z scale). cor : float Same as consensus_correlation (for compatibility). atanh_correlations : ndarray Gene-wise correlations on Fisher z scale. Notes ----- This function uses REML estimation of variance components in a mixed linear model. It requires the MixedLM functionality from statsmodels. The consensus correlation should be used as the `correlation` argument to `gls_series()`. References ---------- Smyth, G. K., Michaud, J. and Scott, H. S. (2005). Use of within-array replicate spots for assessing differential expression in microarray experiments. Bioinformatics, 21, 2067-2075. """ from scipy.stats import trim_mean # Polymorphic input: ndarray / dict / EList / AnnData. eawp = get_eawp(M, layer=layer, weights_layer=weights_layer) M = np.asarray(eawp["exprs"], dtype=np.float64) if design is None and eawp.get("design") is not None: design = np.asarray(eawp["design"], dtype=np.float64) if weights is None and eawp.get("weights") is not None: weights = np.asarray(eawp["weights"], dtype=np.float64) n_genes, n_arrays = M.shape # Handle spacing shortcuts (R parity) if isinstance(spacing, str): if spacing == "columns": spacing = 1 elif spacing == "topbottom": spacing = n_genes // 2 else: raise ValueError(f"Unknown spacing shortcut: {spacing}. Use 'columns' or 'topbottom'.") # Check design if design is None: design = np.ones((n_arrays, 1)) design = np.asarray(design, dtype=np.float64) n_coefs = design.shape[1] # Normalise weight shape through asMatrixWeights; helper returns # a fresh copy so the subsequent NaN writes are safe. if weights is not None: from .classes import as_matrix_weights weights = as_matrix_weights(weights, (n_genes, n_arrays)) weights[weights <= 0] = np.nan M = M.copy() M[~np.isfinite(weights)] = np.nan # Check if block is already encoded in design matrix if block is not None: # One-hot encode block (sans intercept, like R's model.matrix) unique_blocks = np.unique(block) if len(unique_blocks) > 1: block_indicators = np.zeros((len(block), len(unique_blocks) - 1)) for j, b in enumerate(unique_blocks[1:]): block_indicators[:, j] = (block == b).astype(float) # QR decomposition of design q, r = np.linalg.qr(design, mode="complete") rank = np.sum(np.abs(np.diag(r[: design.shape[1], :])) > 1e-10) # Project block indicators through Q' qt_block = q.T @ block_indicators # Check if projection below design rank is essentially zero if rank < q.shape[0] and np.max(np.abs(qt_block[rank:, :])) < 1e-8: warnings.warn( "Block factor already encoded in the design matrix: " "setting intrablock correlation to zero." ) return { "consensus_correlation": 0.0, "cor": 0.0, "atanh_correlations": np.zeros(n_genes), } # Setup blocking structure if block is None: # Within-array duplicates if ndups < 2: warnings.warn("No duplicates: setting correlation to zero.") return { "consensus_correlation": 0.0, "cor": 0.0, "atanh_correlations": np.zeros(n_genes), } # Unwrap duplicates M = unwrap_dups(M, ndups=ndups, spacing=spacing) if weights is not None: weights = unwrap_dups(weights, ndups=ndups, spacing=spacing) design = np.kron(design, np.ones((ndups, 1))) n_genes, n_samples = M.shape # Array indicator (block = array) Array = np.repeat(np.arange(n_arrays), ndups) else: # Sample blocking block = np.asarray(block) if len(block) != n_arrays: raise ValueError("Length of block does not match number of arrays") Array = block # Check for singleton blocks unique_blocks, counts = np.unique(block, return_counts=True) max_block_size = np.max(counts) if max_block_size == 1: warnings.warn("Blocks all of size 1: setting correlation to zero.") return { "consensus_correlation": 0.0, "cor": 0.0, "atanh_correlations": np.zeros(n_genes), } # Compute gene-wise correlations using mixed model rho = np.full(n_genes, np.nan) for i in range(n_genes): y = M[i, :] obs = np.isfinite(y) n_obs = np.sum(obs) if n_obs <= n_coefs + 2: continue y_obs = y[obs] X = design[obs, :] groups = Array[obs] unique_groups = np.unique(groups) n_groups = len(unique_groups) if n_groups <= 1 or n_groups >= n_obs - 1: continue # Build Z matrix (block indicator, like R's model.matrix(~0+A)) Z = np.zeros((n_obs, n_groups)) for j, g in enumerate(unique_groups): Z[:, j] = (groups == g).astype(float) # Get observation weights if available w = None if weights is not None: w = weights[i, obs] if weights.ndim == 2 else weights[obs] if np.any(~np.isfinite(w)) or np.any(w <= 0): w = None # Fit mixed model using statmod-compatible algorithm result = _mixed_model_2_fit(y_obs, X, Z, w=w, maxit=20) if result is not None: var_resid, var_block = result total_var = var_resid + var_block if total_var > 0: rho[i] = var_block / total_var # Bound correlations to ensure positive-definite correlation matrix rho_max = 0.99 if block is None: rho_min = 1 / (1 - ndups) + 0.01 else: rho_min = 1 / (1 - max_block_size) + 0.01 rho = np.clip(rho, rho_min, rho_max) # Fisher z-transformation arho = np.arctanh(rho) # Trimmed mean on z scale valid = np.isfinite(arho) if np.sum(valid) > 0: mrho = np.tanh(trim_mean(arho[valid], trim)) else: mrho = 0.0 return { "consensus_correlation": float(mrho), "cor": float(mrho), "atanh_correlations": arho, }