Source code for pylimma.classes

# SPDX-License-Identifier: GPL-3.0-or-later
#
# This module is a Python port of code from R limma. Original R copyrights:
#   classes.R     Copyright (C) 2003-2022 Gordon Smyth
#   subsetting.R  Copyright (C) 2013-2020 Gordon Smyth
#   lmfit.R       Copyright (C) 2008-2022 Gordon Smyth (getEAWP only)
# Python port: Copyright (C) 2026 John Mulvey
"""
Lightweight data classes and polymorphic input/output dispatchers for pylimma.

Exports
-------
EList           : input wrapper - dict subclass with E, weights, genes, targets, design.
MArrayLM        : fit-result wrapper - dict subclass holding linear-model fits.
get_eawp        : polymorphic input dispatcher, port of R limma's getEAWP().
put_eawp        : polymorphic output dispatcher. No R counterpart - see note below.

Why both classes subclass dict
------------------------------
Every key is reachable via both result["key"] and result.key. The dict-subclass
design preserves backward compatibility for any caller that treats results as
dicts, including all existing R-parity tests.

Why put_eawp exists when R has no counterpart
---------------------------------------------
R achieves input-class-preserving output through S3/S4 method dispatch on the
input class - normalizeBetweenArrays.EList vs .matrix vs .RGList are separate
methods, each knowing what class to return because it only ever sees one input
type. Python lacks that machinery. To get EList-in EList-out, ndarray-in
ndarray-out, and AnnData-in AnnData-mutate from a single Python function, we
need an explicit output-side dispatcher. Without put_eawp we would re-duplicate
the very pattern get_eawp was built to eliminate, just at the tail of every
shape-preserving function.
"""

from __future__ import annotations

from copy import deepcopy
from typing import Any

import numpy as np
import pandas as pd

# ----------------------------------------------------------------------------
# Weight-shape dispatcher (port of R limma's asMatrixWeights)
# ----------------------------------------------------------------------------


def as_matrix_weights(
    weights,
    dim: tuple[int, int] | None = None,
) -> np.ndarray:
    """
    Normalise a weight specification to a fresh ``(n_probes, n_arrays)``
    matrix.

    Port of R limma's private ``asMatrixWeights``
    (``limma/R/weights.R:57-88``). Accepts every shape R does:

    1. full ``(G, N)`` matrix - returned unchanged (copy).
    2. ``(1, N)`` row matrix with ``N == dim[1]`` - broadcast down rows.
    3. scalar / length-1 - broadcast to the full ``(G, N)``.
    4. length-G vector (**probe weights**) - broadcast across columns.
    5. length-N vector (**array weights**) - broadcast across rows.
    6. anything else - ``ValueError("weights is of unexpected size")``.

    The result is always freshly allocated so downstream ``weights[...] = ...``
    writes do not leak into the caller's memory (see
    ``known_diff_weights_mutation.md``).

    R's branch order is preserved verbatim. In particular, if ``G == N``
    a length-G vector is treated as probe weights (branch 4), matching
    R's ambiguity resolution.

    R also tags the output with ``attr(weights, "arrayweights") <- TRUE``
    when it fills branches 2 or 5. pylimma does not propagate that
    attribute because the ``lm_series`` / ``mrlm`` / ``gls_series`` fast-
    path dispatchers value-check ``weights[0] == weights[i]`` at runtime
    instead (see ``lmfit.py`` ``has_probe_weights`` branch).

    Parameters
    ----------
    weights : scalar, 1-D, or 2-D array-like
        Weight specification.
    dim : tuple of (int, int) or None
        Target shape ``(n_probes, n_arrays)``. When ``None`` the input
        is returned as a fresh 2-D matrix (column-matrix for 1-D input,
        matching R's ``as.matrix`` on a vector).

    Returns
    -------
    ndarray
        Matrix of shape ``dim`` (or the input's 2-D shape if ``dim`` is
        ``None``), always a fresh copy.
    """
    arr = np.asarray(weights, dtype=np.float64)
    if arr.ndim == 0:
        arr = arr.reshape(1, 1)
    elif arr.ndim == 1:
        # Mirror R's as.matrix(vector): column matrix (K, 1).
        arr = arr.reshape(-1, 1)
    elif arr.ndim != 2:
        raise ValueError("weights must be scalar, 1-D, or 2-D")

    if dim is None:
        return arr.copy()

    if len(dim) < 2:
        raise ValueError("dim must be a length-2 shape")
    target = (int(round(dim[0])), int(round(dim[1])))
    if target[0] < 1 or target[1] < 1:
        raise ValueError("zero or negative dimensions not allowed")

    dw = arr.shape

    # Full matrix already.
    if dw == target:
        return arr.copy()

    if min(dw) != 1:
        raise ValueError("weights is of unexpected shape")

    # Row matrix of array weights: (1, N) with N == target[1] > 1.
    if dw[1] > 1 and dw[1] == target[1]:
        return np.broadcast_to(arr, target).copy()

    lw = int(np.prod(dw))

    # Scalar / length-G probe weights. Broadcast column-wise.
    if lw == 1 or lw == target[0]:
        return np.broadcast_to(arr.reshape(-1, 1), target).copy()

    # Length-N array weights. Broadcast row-wise.
    if lw == target[1]:
        return np.broadcast_to(arr.reshape(1, -1), target).copy()

    raise ValueError("weights is of unexpected size")


# ----------------------------------------------------------------------------
# Slot-classification tables (port of subsetting.R:84-103 and :107-182)
# ----------------------------------------------------------------------------

_ELIST_IJ = ("E", "Eb", "weights")
_ELIST_IX = ("genes",)
_ELIST_JX = ("targets", "design")
_ELIST_I: tuple[str, ...] = ()

_MARRAYLM_IJ = (
    "coefficients",
    "stdev_unscaled",
    "t",
    "p_value",
    "lods",
    "weights",
)
_MARRAYLM_IX = ("genes",)
_MARRAYLM_JX: tuple[str, ...] = ()
_MARRAYLM_I = (
    "Amean",
    "sigma",
    "df_residual",
    "df_prior",
    "df_total",
    "s2_post",
    "F",
    "F_p_value",
)


# ----------------------------------------------------------------------------
# Helpers
# ----------------------------------------------------------------------------


def _is_anndata(obj: Any) -> bool:
    try:
        from anndata import AnnData
    except ImportError:
        return False
    return isinstance(obj, AnnData)


def _resolve_index(idx, names):
    """Resolve a row/column selector to an integer array or slice.

    Accepts bool masks, int arrays, string lists (resolved against names),
    or slices. Returns a form usable as a numpy index.
    """
    if idx is None:
        return slice(None)
    if isinstance(idx, slice):
        return idx
    arr = np.atleast_1d(np.asarray(idx))
    if arr.dtype == bool:
        return np.where(arr)[0]
    if arr.dtype.kind in ("U", "S", "O") and names is not None:
        names_list = list(names)
        out = []
        for name in arr:
            if name not in names_list:
                raise KeyError(f"Subscript not found: {name!r}")
            out.append(names_list.index(name))
        return np.asarray(out, dtype=int)
    return arr.astype(int)


def _slice_2d(x, i, j):
    """Slice a 2D array-like by rows then columns, preserving 2D-ness."""
    if x is None:
        return None
    if isinstance(x, pd.DataFrame):
        return (
            x.iloc[i, :].iloc[:, j]
            if not isinstance(j, slice) or j != slice(None)
            else x.iloc[i, :]
        )
    arr = np.asarray(x)
    if arr.ndim == 1:
        return arr[i]
    return (
        arr[np.ix_(np.atleast_1d(i), np.atleast_1d(j))]
        if not (isinstance(i, slice) or isinstance(j, slice))
        else arr[i, :][:, j]
    )


def _slice_rows(x, i):
    if x is None:
        return None
    if isinstance(x, pd.DataFrame):
        return x.iloc[i]
    arr = np.asarray(x)
    if arr.ndim == 1:
        return arr[i]
    return arr[i, :] if isinstance(i, slice) else arr[np.atleast_1d(i), :]


def _slice_cols(x, j):
    if x is None:
        return None
    if isinstance(x, pd.DataFrame):
        return x.iloc[j]
    arr = np.asarray(x)
    if arr.ndim == 1:
        return arr[j]
    return arr[j, :] if isinstance(j, slice) else arr[np.atleast_1d(j), :]


def _slice_1d(x, i):
    if x is None:
        return None
    arr = np.asarray(x)
    return arr[i] if isinstance(i, slice) else arr[np.atleast_1d(i)]


def _print_head(x, name: str, max_rows: int = 5) -> str:
    """R-style printHead - truncated display of a component."""
    if x is None:
        return f"${name}\nNULL\n"
    if isinstance(x, np.ndarray):
        if x.ndim == 2:
            n = x.shape[0]
            shown = x[: min(max_rows, n)]
            lines = [f"${name}", str(shown)]
            if n > max_rows:
                lines.append(f"{n - max_rows} more rows ...")
            return "\n".join(lines) + "\n"
        elif x.ndim == 1:
            n = x.shape[0]
            shown = x[: min(max_rows * 4, n)]
            lines = [f"${name}", str(shown)]
            if n > max_rows * 4:
                lines.append(f"{n - max_rows * 4} more elements ...")
            return "\n".join(lines) + "\n"
    if isinstance(x, pd.DataFrame):
        n = len(x)
        lines = [f"${name}", str(x.head(max_rows))]
        if n > max_rows:
            lines.append(f"{n - max_rows} more rows ...")
        return "\n".join(lines) + "\n"
    return f"${name}\n{x}\n"


# ----------------------------------------------------------------------------
# Base class
# ----------------------------------------------------------------------------


class _LargeDataObject(dict):
    """Shared behaviour for EList and MArrayLM.

    Subclasses must define:
        _matrix_key : str
            Name of the primary (genes x samples) matrix slot.
        _slot_ij    : tuple[str, ...]   - 2D slots sliced by both i and j
        _slot_ix    : tuple[str, ...]   - slots sliced by i only (row-aligned)
        _slot_jx    : tuple[str, ...]   - slots sliced by j only (col-aligned)
        _slot_i     : tuple[str, ...]   - 1D slots sliced by i (one per gene)
    """

    _matrix_key: str = ""
    _slot_ij: tuple[str, ...] = ()
    _slot_ix: tuple[str, ...] = ()
    _slot_jx: tuple[str, ...] = ()
    _slot_i: tuple[str, ...] = ()

    # ---- attribute access ----

    def __getattr__(self, name: str):
        try:
            return self[name]
        except KeyError as e:
            raise AttributeError(f"{type(self).__name__!s} has no attribute {name!r}") from e

    def __setattr__(self, name: str, value) -> None:
        self[name] = value

    def __delattr__(self, name: str) -> None:
        try:
            del self[name]
        except KeyError as e:
            raise AttributeError(f"{type(self).__name__!s} has no attribute {name!r}") from e

    # ---- dimensions ----

    @property
    def shape(self) -> tuple[int, int]:
        m = self.get(self._matrix_key)
        if m is None:
            return (0, 0)
        arr = np.asarray(m) if not hasattr(m, "shape") else m
        if arr.ndim == 1:
            return (arr.shape[0], 1)
        return arr.shape[:2]

    def dim(self) -> tuple[int, int]:
        return self.shape

    @property
    def nrow(self) -> int:
        return self.shape[0]

    @property
    def ncol(self) -> int:
        return self.shape[1]

    def __len__(self) -> int:
        return dict.__len__(self)

    @property
    def dimnames(self) -> tuple[list | None, list | None]:
        m = self.get(self._matrix_key)
        if isinstance(m, pd.DataFrame):
            return (list(m.index), list(m.columns))
        genes = self.get("genes")
        targets = self.get("targets")
        row = None
        col = None
        if genes is not None and isinstance(genes, pd.DataFrame):
            row = list(genes.index)
        if targets is not None and isinstance(targets, pd.DataFrame):
            col = list(targets.index)
        return (row, col)

    # ---- head / tail ----

    def head(self, n: int = 6) -> "_LargeDataObject":
        n = min(max(n, 0), self.nrow)
        return self._subset(slice(0, n), slice(None))

    def tail(self, n: int = 6) -> "_LargeDataObject":
        n = min(max(n, 0), self.nrow)
        start = max(self.nrow - n, 0)
        return self._subset(slice(start, self.nrow), slice(None))

    # ---- subsetting ----

    def __getitem__(self, key):
        if isinstance(key, tuple) and len(key) == 2:
            return self._subset(key[0], key[1])
        return dict.__getitem__(self, key)

    def __setitem__(self, key, value) -> None:
        if isinstance(key, tuple):
            raise TypeError("Tuple subscripts are read-only on " + type(self).__name__)
        dict.__setitem__(self, key, value)

    def _subset(self, i, j) -> "_LargeDataObject":
        row_names, col_names = self.dimnames
        i_idx = _resolve_index(i, row_names)
        j_idx = _resolve_index(j, col_names)

        out = deepcopy(self)

        for k in self._slot_ij:
            v = out.get(k)
            if v is not None:
                out[k] = _slice_2d(v, i_idx, j_idx)
        for k in self._slot_ix:
            v = out.get(k)
            if v is not None:
                out[k] = _slice_rows(v, i_idx)
        for k in self._slot_jx:
            v = out.get(k)
            if v is not None:
                out[k] = _slice_rows(v, j_idx)
        for k in self._slot_i:
            v = out.get(k)
            if v is not None and np.asarray(v).size > 1:
                out[k] = _slice_1d(v, i_idx)

        return out

    # ---- repr ----

    def __repr__(self) -> str:
        lines = [f'An object of class "{type(self).__name__}"']
        for name, value in self.items():
            lines.append(_print_head(value, name))
        return "\n".join(lines).rstrip() + "\n"


# ----------------------------------------------------------------------------
# Public classes
# ----------------------------------------------------------------------------


[docs] class EList(_LargeDataObject): """Expression-list container (Python equivalent of R limma's EList). Required slot: E (2D numeric, genes x samples). Optional slots: weights, genes, targets, design, plus arbitrary extras. Examples -------- >>> el = EList({"E": np.random.randn(100, 8), "design": np.eye(8)}) >>> el.shape (100, 8) >>> el[:10, :].shape (10, 8) """ _matrix_key = "E" _slot_ij = _ELIST_IJ _slot_ix = _ELIST_IX _slot_jx = _ELIST_JX _slot_i = _ELIST_I
[docs] def __init__(self, data=None, /, **kwargs): if data is None: data = {} if isinstance(data, EList): super().__init__(dict(data)) elif isinstance(data, dict): super().__init__(data) else: raise TypeError(f"EList expected dict or EList, got {type(data).__name__}") for k, v in kwargs.items(): self[k] = v
[docs] class MArrayLM(_LargeDataObject): """Linear-model-fit container (Python equivalent of R limma's MArrayLM). Holds the output of lm_fit / contrasts_fit / e_bayes / treat. """ _matrix_key = "coefficients" _slot_ij = _MARRAYLM_IJ _slot_ix = _MARRAYLM_IX _slot_jx = _MARRAYLM_JX _slot_i = _MARRAYLM_I
[docs] def __init__(self, data=None, /, **kwargs): if data is None: data = {} if isinstance(data, MArrayLM): super().__init__(dict(data)) elif isinstance(data, dict): super().__init__(data) else: raise TypeError(f"MArrayLM expected dict or MArrayLM, got {type(data).__name__}") for k, v in kwargs.items(): self[k] = v
# ---- R-parity convenience methods (lmfit.R, classes.R) ----
[docs] def fitted(self) -> np.ndarray: """ Fitted values ``coefficients @ design.T``. Port of R limma's ``fitted.MArrayLM`` (``lmfit.R``). Raises when the fit holds contrasts rather than raw coefficients. """ if self.get("contrasts") is not None: raise ValueError( "Object contains contrasts rather than coefficients, " "so fitted values cannot be computed." ) design = self.get("design") coef = self.get("coefficients") if design is None or coef is None: raise ValueError("Fit must contain coefficients and design") return np.asarray(coef) @ np.asarray(design).T
[docs] def residuals(self, y) -> np.ndarray: """ Residuals ``y - fitted``. Port of R limma's ``residuals.MArrayLM`` (``lmfit.R``). """ return np.asarray(y) - self.fitted()
[docs] def as_dataframe(self, row_names=None) -> pd.DataFrame: """ Flatten the fit into a ``pandas.DataFrame`` with one row per probe. Port of R limma's ``as.data.frame.MArrayLM`` (``classes.R``). Only slots whose first dimension matches the number of probes are retained. """ if self.get("coefficients") is None: import warnings as _w _w.warn("NULL coefficients, returning empty data.frame") return pd.DataFrame() coef = np.asarray(self["coefficients"]) if coef.ndim == 1: coef = coef.reshape(-1, 1) nprobes, ncoef = coef.shape columns: dict[str, np.ndarray] = {} for name, value in self.items(): arr = np.asarray(value) if value is not None else None if arr is None: continue if arr.ndim == 0 or arr.size == 0: continue if arr.shape[0] != nprobes: continue if arr.ndim == 1: columns[name] = arr else: for j in range(arr.shape[1]): col_name = f"{name}.{j + 1}" if arr.shape[1] > 1 else name columns[col_name] = arr[:, j] return pd.DataFrame(columns, index=row_names)
# ---------------------------------------------------------------------------- # Polymorphic input dispatcher # ---------------------------------------------------------------------------- _UNSUPPORTED_WRAPPER_NAMES = { "RGList", "MAList", "EListRaw", "ExpressionSet", "eSet", "PLMset", "marrayNorm", "DGEList", }
[docs] def get_eawp( obj, *, layer: str | None = None, weights_layer: str | None = None, ) -> dict: """Polymorphic input dispatcher - port of R limma's getEAWP(). Accepts a variety of expression-data containers and returns a plain dict with R-parity keys. The returned dict is consumed by linear-modelling functions; it is deliberately not wrapped in an EList. Parameters ---------- obj : ndarray, DataFrame, dict, EList, or AnnData Expression data. Dict-with-'E' is accepted for backwards compatibility and is treated identically to an EList. layer : str, optional AnnData-only. Name of an adata.layers[...] entry to use in place of X. weights_layer : str, optional AnnData-only. Name of an adata.layers[...] entry to read as observation weights. When ``None`` (default) and ``layer`` ends in ``"_E"``, the companion layer ``{layer[:-2]}_weights`` is auto-loaded if it exists (the voom/vooma convention); when set explicitly, the named layer is read instead. Use this when you've customised :func:`voom`'s ``weights_layer=`` output name on the write side and need the read side to match. Returns ------- dict With keys: exprs (numpy 2D, genes x samples), weights, probes, Amean, design, targets. Missing components are None. Raises ------ TypeError If obj is of an unsupported class. Two-channel microarray wrappers (RGList, MAList, EListRaw) and Bioconductor S4 containers (ExpressionSet, eSet, PLMset, marrayNorm) are deliberately out of scope - see policy_data_class_wrappers in project memory. """ if obj is None: raise TypeError("data object is None") y: dict[str, Any] = { "exprs": None, "weights": None, "probes": None, "Amean": None, "design": None, "targets": None, } if layer is not None and not _is_anndata(obj): raise TypeError( f"layer={layer!r} is only supported for AnnData input; " f"got {type(obj).__name__}. For EList / dict / ndarray / " "DataFrame inputs, pass the data directly from the slot " "you intended." ) if weights_layer is not None and not _is_anndata(obj): raise TypeError( f"weights_layer={weights_layer!r} is only supported for " f"AnnData input; got {type(obj).__name__}. For EList / dict / " "ndarray / DataFrame inputs, pass weights= directly." ) if _is_anndata(obj): import scipy.sparse as _sp def _densify(mat): # scanpy workflows routinely store counts as scipy.sparse; # np.asarray on sparse yields a 0-d object array, so # densify explicitly first. if _sp.issparse(mat): mat = mat.toarray() return np.asarray(mat, dtype=np.float64) adata = obj if layer is not None: mat = adata.layers[layer] else: mat = adata.X y["exprs"] = _densify(mat).T # Capture adata.var / adata.obs as full DataFrames even when they # have zero annotation columns - the index still carries # var_names / obs_names, and consumers (top_table genelist, # fit['targets'], diagnostic plots) care about that index. # Mirrors the DataFrame branch's non-RangeIndex capture logic. if adata.var is not None: y["probes"] = adata.var if adata.obs is not None: y["targets"] = adata.obs y["Amean"] = np.nanmean(y["exprs"], axis=1) # Load companion weights. If the caller passed weights_layer= # explicitly, use that layer verbatim. Otherwise fall back to # the voom/vooma convention: when layer ends in "_E", look for # the companion layer {stem}_weights. AnnData layers are stored # in n_samples x n_genes orientation; transpose to limma's # n_genes x n_samples. Also pick up the design that voom/vooma # stashed at adata.uns[stem]["design"] so downstream consumers # (notably lm_fit) see it through the same eawp["design"] slot # that EList input populates, matching R's # if(is.null(design)) design <- y$design # one-liner. if weights_layer is not None: if weights_layer not in adata.layers: raise KeyError( f"weights_layer={weights_layer!r} not found in " f"adata.layers (available: {list(adata.layers)})" ) y["weights"] = _densify(adata.layers[weights_layer]).T if layer is not None and layer.endswith("_E"): stem = layer[:-2] if weights_layer is None: weights_candidate = f"{stem}_weights" if weights_candidate in adata.layers: y["weights"] = _densify(adata.layers[weights_candidate]).T uns_payload = adata.uns.get(stem) if isinstance(uns_payload, dict): if uns_payload.get("design") is not None: y["design"] = uns_payload["design"] return y if isinstance(obj, EList): return _eawp_from_elist_like(obj) cls_name = type(obj).__name__ if cls_name in _UNSUPPORTED_WRAPPER_NAMES: raise TypeError( f"{cls_name} is not supported by pylimma. " "Two-channel and Bioconductor S4 wrappers are out of scope " "(policy_data_class_wrappers). Extract the expression matrix " "and pass it directly, or wrap it in pylimma.EList." ) if isinstance(obj, dict): if "E" in obj: return _eawp_from_elist_like(obj) if "exprs" in obj: y.update({k: obj.get(k) for k in y.keys() if k in obj}) if y["exprs"] is not None: y["exprs"] = np.asarray(y["exprs"], dtype=np.float64) if y["Amean"] is None: y["Amean"] = np.nanmean(y["exprs"], axis=1) return y raise TypeError( "dict input to get_eawp must contain key 'E' (EList-style) " "or 'exprs' (getEAWP-output-style)" ) if isinstance(obj, pd.DataFrame): numeric_cols = obj.dtypes.apply(lambda dt: np.issubdtype(dt, np.number)) if numeric_cols.all(): y["exprs"] = obj.values.astype(np.float64) elif (~numeric_cols).sum() == 1 and len(obj.columns) > 1: y["exprs"] = obj.iloc[:, 1:].values.astype(np.float64) y["probes"] = obj.iloc[:, [0]] else: raise TypeError( "DataFrame input must be all-numeric or have exactly one " "non-numeric column (treated as gene IDs)" ) # Mirror R's getEAWP: if the input has non-default row names, # wrap them as a one-column probes DataFrame so gene names # propagate into lm_fit / top_table. Skip when the index is # the default RangeIndex (no meaningful names set). if y["probes"] is None and not isinstance(obj.index, pd.RangeIndex): y["probes"] = pd.DataFrame( {"ProbeID": obj.index.astype(str).values}, index=obj.index, ) y["Amean"] = np.nanmean(y["exprs"], axis=1) return y arr = np.asarray(obj) if arr.ndim == 1: arr = arr.reshape(1, -1) if arr.ndim != 2: raise TypeError(f"expression data must be 2-dimensional, got {arr.ndim}D") if not np.issubdtype(arr.dtype, np.number): raise TypeError("expression data must be numeric") y["exprs"] = arr.astype(np.float64, copy=False) y["Amean"] = np.nanmean(y["exprs"], axis=1) return y
def _eawp_from_elist_like(obj) -> dict: """Extract EAWP dict from an EList or EList-shaped dict.""" exprs = np.asarray(obj["E"], dtype=np.float64) return { "exprs": exprs, "weights": obj.get("weights"), "probes": obj.get("genes"), "Amean": np.nanmean(exprs, axis=1), "design": obj.get("design"), "targets": obj.get("targets"), } # ---------------------------------------------------------------------------- # Polymorphic output dispatcher # ----------------------------------------------------------------------------
[docs] def put_eawp( slots: dict, original, *, out_layer: str = "pylimma_E", weights_layer: str | None = "pylimma_weights", uns_key: str = "pylimma", single_matrix: bool = False, ): """Polymorphic output dispatcher - package a result in a form matching the original input's class. This has no direct R counterpart; see module docstring for rationale. Parameters ---------- slots : dict Updated slot values. Typically {'E': ..., 'weights': ..., 'design': ..., ...}. 'E' is required. original : the object originally passed in by the user Determines the output format. out_layer : str For AnnData input, the layer name to which the new E is written. weights_layer : str or None For AnnData input, the layer name for weights. If None, weights are placed inside adata.uns[uns_key] instead. uns_key : str For AnnData input, the adata.uns[...] key for ancillary metadata (design, lib_size, span, targets, etc.). single_matrix : bool, default False When True (appropriate for functions whose only meaningful output is the transformed expression matrix, e.g. normalize_between_arrays), ndarray input returns a bare ndarray. When False (the default, for functions with multi-slot output like voom), ndarray input returns a plain dict containing all slots - matching the pre-existing convention. Returns ------- np.ndarray, dict, EList, or None - ndarray when original is ndarray and single_matrix=True. - dict when original is ndarray (default) or a plain dict. - EList when original is an EList. - None when original is AnnData (side-effects on adata). Notes ----- **AnnData view semantics.** If ``original`` is a view (``adata[:, mask]``), writing to ``adata.layers`` / ``adata.uns`` triggers anndata's ``ImplicitModificationWarning`` and actualises the view into a standalone copy. The write lands on that copy; the parent AnnData is untouched. This matches scanpy's convention and is not a pylimma-specific behaviour. To operate in place on a subset, call ``adata[:, mask].copy()`` first or subset the parent explicitly. A bare one-liner like ``pylimma.voom(adata[:, mask])`` discards the actualised copy along with the return value; bind the view first (``view = adata[:, mask]; pylimma.voom(view)``) to keep the result reachable. """ if "E" not in slots: raise ValueError("put_eawp: slots must contain 'E'") if _is_anndata(original): adata = original E = np.asarray(slots["E"]) adata.layers[out_layer] = E.T if E.ndim == 2 else E if slots.get("weights") is not None and weights_layer is not None: # Normalise weights to a (n_genes, n_samples) matrix before # writing. Covers scalar / length-G probe / length-N array / # (1, N) row / full (G, N) shapes via as_matrix_weights; # without this a 1-D caller would write a 1-D layer and # anndata would reject it. Today's public callers (voom, # vooma, voom_with_quality_weights) all produce 2-D weights # so this is a defensive fix for future callers. W = as_matrix_weights(slots["weights"], E.shape) adata.layers[weights_layer] = W.T uns_payload = { k: v for k, v in slots.items() if k not in ("E",) and v is not None and not (k == "weights" and weights_layer is not None) } if uns_payload: adata.uns[uns_key] = uns_payload return None if isinstance(original, EList): out = EList(dict(original)) for k, v in slots.items(): if v is not None: out[k] = v return out if isinstance(original, dict): out = dict(original) for k, v in slots.items(): if v is not None: out[k] = v return out # ndarray (or anything else coerced to array) if single_matrix: return np.asarray(slots["E"]) return {k: v for k, v in slots.items() if v is not None}
# ---------------------------------------------------------------------------- # Helper for fit-consuming functions (e_bayes, contrasts_fit, top_table, ...) # ---------------------------------------------------------------------------- def _resolve_fit_input(data, key: str): """Resolve the fit input for functions that consume an existing fit. Returns ------- (fit, adata, adata_key) fit: the MArrayLM / dict to operate on. adata: the AnnData if input was AnnData, else None. adata_key: the uns[] key, if input was AnnData, else None. When adata is not None, the caller must write the result back to adata.uns[adata_key] and return None. """ if _is_anndata(data): if key not in data.uns: raise ValueError( f"No fit results found in adata.uns[{key!r}]. Did you run lm_fit() first?" ) return data.uns[key], data, key return data, None, None