Source code for pylimma.weights

# SPDX-License-Identifier: GPL-3.0-or-later
#
# This module is a Python port of code from R limma. Original R copyrights:
#   arrayWeights.R            Copyright (C) 2005-2019 Matt Ritchie, Cynthia Liu,
#                                                     Gordon Smyth
#   arrayWeightsREML.R        Copyright (C) 2005-2019 Gordon Smyth
#   arrayWeightsPrWtsREML.R   Copyright (C) 2019      Gordon Smyth
#   arrayWeightsGeneByGene.R  Copyright (C) 2005-2020 Matt Ritchie, Cynthia Liu,
#                                                     Gordon Smyth
#   arrayWeightsQuick.R       Copyright (C) 2004      Gordon Smyth
#   weights.R (modifyWeights) Copyright (C) 2003-2020 Gordon Smyth
# Python port: Copyright (C) 2026 John Mulvey
"""
Array weights for limma.

Implements functions to estimate sample-specific quality weights:
- array_weights(): Main entry point for estimating array quality weights
- array_weights_quick(): Quick approximation to array weights
- modify_weights(): Multiply rows of a weights matrix by status-specific
  scalars
"""

from __future__ import annotations

import numpy as np
from scipy import linalg

from .classes import get_eawp


def _contr_sum(n: int) -> np.ndarray:
    """
    Create sum-to-zero contrast matrix.

    Equivalent to R's contr.sum(n).

    Parameters
    ----------
    n : int
        Number of levels.

    Returns
    -------
    ndarray
        Contrast matrix of shape (n, n-1).
    """
    if n < 2:
        raise ValueError("Need at least 2 levels for contrast coding")
    # Identity matrix minus last row
    mat = np.eye(n, n - 1)
    mat[-1, :] = -1
    return mat


def _hat_values(qr_result: tuple) -> np.ndarray:
    """
    Compute hat values (diagonal of hat matrix) from QR decomposition.

    Parameters
    ----------
    qr_result : tuple
        Result from np.linalg.qr or scipy.linalg.qr.

    Returns
    -------
    ndarray
        Hat values.
    """
    Q = qr_result[0]
    # Hat values are row sums of squares of Q
    return np.sum(Q**2, axis=1)


def _array_weights_gene_by_gene(
    E: np.ndarray,
    design: np.ndarray,
    weights: np.ndarray | None,
    var_design: np.ndarray,
    prior_n: float = 10,
    trace: bool = False,
) -> np.ndarray:
    """
    Estimate array variances via gene-by-gene update algorithm.

    Internal function implementing the gene-by-gene method for array weights.

    Parameters
    ----------
    E : ndarray
        Expression matrix, shape (n_genes, n_samples).
    design : ndarray
        Design matrix, shape (n_samples, n_coefficients).
    weights : ndarray or None
        Prior observation weights, shape (n_genes, n_samples).
    var_design : ndarray
        Variance design matrix (columns sum to zero).
    prior_n : float
        Prior sample size for regularization.

    Returns
    -------
    ndarray
        Array weights, shape (n_samples,).
    """
    n_genes, n_samples = E.shape

    # Z matrix: intercept + var_design
    Z = np.column_stack([np.ones(n_samples), var_design])
    n_gam = var_design.shape[1]

    # Initialize array gammas to zero (with prior weight)
    gam = np.zeros(n_gam)
    aw = np.ones(n_samples)

    # Prior information matrix
    info2 = prior_n * var_design.T @ var_design

    # trace: report weight range roughly 10 times across genes (matches R)
    if trace:
        print("gene range(w)")
        report_interval = max(n_genes // 10, 1)

    # Step through genes
    for i in range(n_genes):
        # Combine array weights with observation weights
        if weights is None:
            w = aw.copy()
        else:
            w = aw * weights[i, :]

        y = E[i, :]

        # Handle missing values
        if np.any(np.isnan(y)):
            obs = np.isfinite(y)
            n_obs = np.sum(obs)
            if n_obs <= 2:
                continue

            X = design[obs, :]
            y_obs = y[obs]
            w_obs = w[obs]

            # Weighted least squares fit
            sqrt_w = np.sqrt(w_obs)
            X_w = X * sqrt_w[:, np.newaxis]
            y_w = y_obs * sqrt_w

            try:
                # Use full QR decomposition to get residual effects
                q, r = linalg.qr(X_w, mode="full")
                r_diag = np.diag(r[: X_w.shape[1], :])
                rank = np.sum(np.abs(r_diag) > 1e-10)
                df_resid = n_obs - rank
                if df_resid < 2:
                    continue

                qty = q.T @ y_w

                # Hat values from economic Q
                q_econ = q[:, : X_w.shape[1]]
                h1 = np.zeros(n_samples)
                h1[obs] = 1 - np.sum(q_econ**2, axis=1)

                # Weighted squared residuals
                resid_effects = qty[:rank]
                fitted = q[:, :rank] @ resid_effects
                resid = (y_w - fitted) / sqrt_w

                d = np.zeros(n_samples)
                d[obs] = w_obs * resid**2

                # Residual variance from effects beyond the fitted
                effects = qty[rank:]
                s2 = np.mean(effects**2)
            except (np.linalg.LinAlgError, ValueError):
                continue
        else:
            # No missing values - fit all at once
            sqrt_w = np.sqrt(w)
            X_w = design * sqrt_w[:, np.newaxis]
            y_w = y * sqrt_w

            try:
                # Use full QR decomposition
                q, r = linalg.qr(X_w, mode="full")
                r_diag = np.diag(r[: design.shape[1], :])
                rank = np.sum(np.abs(r_diag) > 1e-10)
                df_resid = n_samples - rank
                if df_resid < 2:
                    continue

                qty = q.T @ y_w

                # Hat values from economic Q
                q_econ = q[:, : design.shape[1]]
                h1 = 1 - np.sum(q_econ**2, axis=1)

                # Residuals
                fitted = q[:, :rank] @ qty[:rank]
                resid = (y_w - fitted) / sqrt_w
                d = w * resid**2

                # Residual variance
                effects = qty[rank:]
                s2 = np.mean(effects**2)
            except (np.linalg.LinAlgError, ValueError):
                continue

        if s2 < 1e-15:
            continue

        # Update information matrix
        # info = t(Z) %*% (h1 * Z)
        info = Z.T @ (h1[:, np.newaxis] * Z)

        # Schur complement update
        info2 = info2 + info[1:, 1:] - np.outer(info[1:, 0], info[0, 1:]) / info[0, 0]

        # Score
        z = d / s2 - h1
        dl = var_design.T @ z

        # Update gamma
        try:
            gam = gam + np.linalg.solve(info2, dl)
        except np.linalg.LinAlgError:
            continue

        # Update array weights
        aw = np.exp(var_design @ (-gam))

        if trace and (i + 1) % report_interval == 0:
            print(f"{i + 1} {aw.min():.6g} {aw.max():.6g}")

    return aw


def _array_weights_reml(
    E: np.ndarray,
    design: np.ndarray,
    var_design: np.ndarray,
    prior_n: float = 10,
    maxiter: int = 50,
    tol: float = 1e-5,
    trace: bool = False,
) -> np.ndarray:
    """
    Estimate array weights by REML.

    Faithful port of limma's .arrayWeightsREML (limma/R/arrayWeightsREML.R).
    Uses an exact Fisher scoring algorithm similar to statmod::remlscor:

    - Initial unweighted fit filters genes with zero residual variance.
    - Fisher information built from pairwise products of Q = QR-fit basis columns.
    - Score includes prior.n support toward w=1.
    - Convergence on score-step inner product, rescaled by ngam*(ngenes+prior.n).

    Parameters
    ----------
    E : ndarray
        Expression matrix, shape (n_genes, n_samples). Assumes no NAs or
        infinite values (caller handles filtering).
    design : ndarray
        Design matrix, shape (n_samples, p).
    var_design : ndarray
        Variance design matrix Z2 (columns sum to zero, excludes intercept).
    prior_n : float
        Prior sample size pulling weights toward 1.
    maxiter : int
        Maximum Fisher scoring iterations.
    tol : float
        Convergence tolerance on rescaled score-step inner product.
    """
    n_genes, n_samples = E.shape
    Z2 = var_design
    n_gam = Z2.shape[1]
    Z = np.column_stack([np.ones(n_samples), Z2])
    p = design.shape[1]
    p2 = (p * (p + 1)) // 2

    # Starting values
    gam = np.zeros(n_gam)
    w = np.ones(n_samples)
    convcrit_last = np.inf

    if trace:
        print("iter convcrit range(w)")

    # Initial unweighted fit to detect zero-variance genes and seed residuals.
    # Use full Q (narrays x narrays) so we can access the residual effects rows.
    q_u_full, r_u = linalg.qr(design, mode="full")
    rank_u = int(np.sum(np.abs(np.diag(r_u[:p, :])) > 1e-10))
    q_u = q_u_full[:, :p]  # first p cols = column space basis
    effects_u = q_u_full.T @ E.T  # (narrays, ngenes)
    effects_null = effects_u[rank_u:, :]  # residual effects
    s2 = np.mean(effects_null**2, axis=0)

    zero_var_filter = None
    if np.min(s2) < 1e-15:
        zero_var_filter = s2 >= 1e-15
        E = E[zero_var_filter, :]
        n_genes = E.shape[0]
        if n_genes < 2:
            return w
        s2 = s2[zero_var_filter]
        effects_u = effects_u[:, zero_var_filter]

    # Residuals (original scale): y - X beta = y - Q[:,:rank] @ Q[:,:rank].T @ y
    residuals = E - (q_u[:, :rank_u] @ effects_u[:rank_u, :]).T
    qr_factors = (q_u, rank_u)

    for iteration in range(1, maxiter + 1):
        if iteration > 1:
            sw = np.sqrt(w)
            design_w = design * sw[:, np.newaxis]
            E_w = E * sw
            q_full, r_it = linalg.qr(design_w, mode="full")
            rank_it = int(np.sum(np.abs(np.diag(r_it[:p, :])) > 1e-10))
            q_it = q_full[:, :p]
            effects = q_full.T @ E_w.T
            s2 = np.mean(effects[rank_it:, :] ** 2, axis=0)
            # R's lm.wfit$residuals returns residuals on the original (unweighted)
            # scale: y - X beta.
            beta = linalg.solve_triangular(
                r_it[:rank_it, :rank_it], effects[:rank_it, :], lower=False
            )
            residuals = (E.T - design @ beta).T
            qr_factors = (q_it, rank_it)

        q, rank = qr_factors

        # Q (narrays x p) = first p cols of full orthogonal matrix
        # qr.qy(qr, diag(n,p)) returns the first p cols of the full Q.
        # With economic QR we already have q of shape (narrays, p).
        Q = q  # shape (narrays, p)

        # Build Q2: p2 columns of pairwise Q products.
        # Columns grouped by offset k=0..p-1: cols (j0+1):(j0+p-k) = Q[,1:(p-k)] * Q[,(k+1):p]
        Q2 = np.zeros((n_samples, p2))
        j0 = 0
        for k in range(p):
            span = p - k
            Q2[:, j0 : j0 + span] = Q[:, :span] * Q[:, k : k + span]
            j0 += span
        if p > 1:
            Q2[:, p:p2] *= np.sqrt(2.0)

        # Hat values: row sums of diagonal product block (first p cols)
        h = np.sum(Q2[:, :p], axis=1)

        # Fisher info (including intercept for log-variance model)
        info = Z.T @ ((1.0 - 2.0 * h)[:, np.newaxis] * Z) + (Q2.T @ Z).T @ (Q2.T @ Z)

        # Schur complement removing intercept row/col
        info2 = info[1:, 1:] - np.outer(info[1:, 0], info[0, 1:]) / info[0, 0]

        # Score: colMeans over genes of w * residuals^2 / s2, then - (1 - h)
        # residuals is shape (ngenes, narrays). w * resid^2 / s2 gives per-gene
        # per-array contribution; colMeans = mean across genes, shape (narrays,).
        score_per = (w[np.newaxis, :] * residuals**2) / s2[:, np.newaxis]
        z = np.mean(score_per, axis=0) - (1.0 - h)

        # Add prior support
        info2 = n_genes * info2 + prior_n * (Z2.T @ Z2)
        z = n_genes * z + prior_n * (w - 1.0)

        dl = Z2.T @ z
        try:
            gamstep = linalg.solve(info2, dl)
        except linalg.LinAlgError:
            break

        convcrit = float(dl @ gamstep) / n_gam / (n_genes + prior_n)
        if not np.isfinite(convcrit) or convcrit >= convcrit_last:
            break
        convcrit_last = convcrit

        gam = gam + gamstep
        w = np.exp(Z2 @ (-gam))

        if trace:
            print(f"{iteration} {convcrit:.6g} {w.min():.6g} {w.max():.6g}")

        if convcrit < tol:
            break

    return w


def _array_weights_pr_wts_reml(
    E: np.ndarray,
    design: np.ndarray,
    weights: np.ndarray,
    var_design: np.ndarray,
    prior_n: float = 10,
    maxiter: int = 50,
    tol: float = 1e-5,
    trace: bool = False,
) -> np.ndarray:
    """
    Estimate array weights by REML allowing for prior observation weights.

    Faithful per-gene Fisher-scoring port of limma's
    .arrayWeightsPrWtsREML (limma/R/arrayWeightsPrWtsREML.R).

    Algorithmic structure mirrors :func:`_array_weights_reml` but accumulates
    Fisher information and the score *inside* the gene loop using
    `lm.wfit(design, y[g,], w * weights[g,])` semantics. Information and
    score are then averaged by `(ngenes + prior.n)` per
    `arrayWeightsPrWtsREML.R:65-66`.

    Parameters
    ----------
    E : ndarray
        Expression matrix, shape (n_genes, n_samples). Assumes no NA / Inf
        (caller filters rows).
    design : ndarray
        Design matrix, shape (n_samples, p).
    weights : ndarray
        Prior observation weights, shape (n_genes, n_samples). Non-NaN.
    var_design : ndarray
        Variance design matrix Z2 (intercept already excluded, columns sum
        to zero).
    prior_n : float
        Prior sample size pulling weights toward 1.
    maxiter : int
        Maximum Fisher-scoring iterations.
    tol : float
        Convergence tolerance on the rescaled score-step inner product.
    trace : bool
        If True, print iteration progress.
    """
    n_genes, n_samples = E.shape
    Z2 = var_design
    n_gam = Z2.shape[1]
    Z = np.column_stack([np.ones(n_samples), Z2])
    p = design.shape[1]
    p2 = (p * (p + 1)) // 2

    gam = np.zeros(n_gam)
    w = np.ones(n_samples)

    if trace:
        print("iter convcrit range(w)")

    for iteration in range(1, maxiter + 1):
        info2 = prior_n * (Z2.T @ Z2)
        z = prior_n * (w - 1.0)

        for g in range(n_genes):
            wg = w * weights[g, :]
            if np.any(wg <= 0):
                continue

            sw = np.sqrt(wg)
            X_w = design * sw[:, np.newaxis]
            y_w = E[g, :] * sw

            try:
                q_full, r_it = linalg.qr(X_w, mode="full")
            except linalg.LinAlgError:
                continue
            rank = int(np.sum(np.abs(np.diag(r_it[:p, :])) > 1e-10))
            if rank < p:
                # rank-deficient gene; skip rather than risk incompatible Q2
                continue

            Q = q_full[:, :p]
            effects = q_full.T @ y_w
            s2 = float(np.mean(effects[rank:] ** 2))

            # R: residuals are y - X beta on the original (unweighted) scale
            beta = linalg.solve_triangular(r_it[:rank, :rank], effects[:rank], lower=False)
            resid = E[g, :] - design @ beta

            Q2 = np.zeros((n_samples, p2))
            j0 = 0
            for k in range(p):
                span = p - k
                Q2[:, j0 : j0 + span] = Q[:, :span] * Q[:, k : k + span]
                j0 += span
            if p > 1:
                Q2[:, p:p2] *= np.sqrt(2.0)

            h = np.sum(Q2[:, :p], axis=1)
            info = Z.T @ ((1.0 - 2.0 * h)[:, np.newaxis] * Z) + (Q2.T @ Z).T @ (Q2.T @ Z)
            info2 = info2 + info[1:, 1:] - np.outer(info[1:, 0], info[0, 1:]) / info[0, 0]

            if s2 > 1e-15:
                z = z + wg * resid**2 / s2 - (1.0 - h)

        info2_avg = info2 / (n_genes + prior_n)
        z_avg = z / (n_genes + prior_n)

        dl = Z2.T @ z_avg
        try:
            gamstep = linalg.solve(info2_avg, dl)
        except linalg.LinAlgError:
            break

        gam = gam + gamstep
        w = np.exp(Z2 @ (-gam))

        convcrit = float(dl @ gamstep) / (n_genes + prior_n) / n_gam
        if trace:
            print(f"{iteration} {convcrit:.6g} {w.min():.6g} {w.max():.6g}")

        if not np.isfinite(convcrit):
            break
        if convcrit < tol:
            break

    return w


[docs] def array_weights( object, design: np.ndarray | None = None, weights: np.ndarray | None = None, var_design: np.ndarray | None = None, var_group: np.ndarray | None = None, prior_n: float = 10, method: str = "auto", maxiter: int = 50, tol: float = 1e-5, trace: bool = False, *, layer: str | None = None, weights_layer: str | None = None, ) -> np.ndarray: """ Estimate relative quality weights for each array/sample. Estimates the relative reliability of each sample in a gene expression experiment. Samples with higher variability get lower weights. Parameters ---------- object : dict Dict with 'E' (expression matrix) and optionally 'weights'. Typically the output from voom(). design : ndarray, optional Design matrix for the linear model. If None, taken from object or defaults to intercept-only. weights : ndarray, optional Prior observation weights. If None, taken from object. var_design : ndarray, optional Design matrix for the variance model. Columns should sum to zero. var_group : ndarray, optional Factor defining variance groups. Takes precedence over var_design. prior_n : float, default 10 Prior sample size for regularization. Higher values give more stable but less responsive estimates. method : str, default "auto" Estimation method: - "auto": Choose automatically (genebygene if weights or NAs present) - "genebygene": Gene-by-gene update algorithm - "reml": REML estimation (faster when no weights or NAs) maxiter : int, default 50 Maximum iterations for REML method. tol : float, default 1e-5 Convergence tolerance for REML method. trace : bool, default False If True, print iteration progress (array weight range) to stdout. Returns ------- ndarray Quality weights for each sample, shape (n_samples,). Higher weights indicate more reliable samples. Notes ----- Array weights are useful when some samples have higher technical variability than others. By downweighting noisy samples, the analysis gains power while remaining valid. The weights are relative (their geometric mean is approximately 1) and can be incorporated into downstream analysis via lm_fit(). """ # Polymorphic input: ndarray / dict / EList / AnnData. eawp = get_eawp(object, layer=layer, weights_layer=weights_layer) E = np.asarray(eawp["exprs"], dtype=np.float64) n_genes, n_samples = E.shape # Pick up design/weights from the input object if not passed explicitly. if design is None and eawp.get("design") is not None: design = np.asarray(eawp["design"], dtype=np.float64) if weights is None and eawp.get("weights") is not None: weights = np.asarray(eawp["weights"], dtype=np.float64) # Initial weights w = np.ones(n_samples) # Require at least 2 genes if n_genes < 2: return w # Default design if design is None: design = np.ones((n_samples, 1)) design = np.asarray(design, dtype=np.float64) # Reduce rank if not full rank q, r, pivot = linalg.qr(design, pivoting=True) p = np.sum(np.abs(np.diag(r)) > 1e-10) if p < design.shape[1]: design = design[:, pivot[:p]] # Require at least 2 residual df if n_samples - p < 2: return w # Check weights (already extracted from input object above if not passed) if weights is not None: weights = np.asarray(weights, dtype=np.float64) if weights.shape != E.shape: raise ValueError("weights must have same shape as expression matrix") if np.any(np.isnan(weights)): raise ValueError("NA weights not allowed") if np.any(weights < 0): raise ValueError("Negative weights not allowed") if np.any(np.isinf(weights)): raise ValueError("Infinite weights not allowed") # Handle zero weights by setting corresponding E to NA if np.any(weights == 0): E = E.copy() E[weights == 0] = np.nan weights = weights.copy() weights[weights == 0] = 1 # Handle var_group (takes precedence over var_design) if var_group is not None: var_group = np.asarray(var_group) if len(var_group) != n_samples: raise ValueError("var_group has wrong length") # Get unique levels unique_levels = np.unique(var_group) n_levels = len(unique_levels) if n_levels < 2: raise ValueError("Need at least two variance groups") # Create contrast-coded design matrix (sum to zero) # Similar to R's contr.sum var_design = np.zeros((n_samples, n_levels - 1)) for i, level in enumerate(unique_levels[:-1]): var_design[var_group == level, i] = 1 var_design[var_group == unique_levels[-1], :] = -1 # Setup variance design matrix if var_design is None: var_design = _contr_sum(n_samples) else: # Center columns (make them sum to zero) var_design = var_design - np.mean(var_design, axis=0) # Remove rank-deficient columns q_v, r_v, pivot_v = linalg.qr(var_design, pivoting=True) rank_v = np.sum(np.abs(np.diag(r_v)) > 1e-10) var_design = var_design[:, pivot_v[:rank_v]] # Detect NA values has_na = np.any(~np.isfinite(E)) if has_na: E = E.copy() E[~np.isfinite(E)] = np.nan # Choose method method = method.lower() if method == "auto": if has_na or weights is not None: method = "genebygene" else: method = "reml" if method == "genebygene": return _array_weights_gene_by_gene(E, design, weights, var_design, prior_n, trace=trace) elif method == "reml": if has_na: # Remove rows with any NA na_rows = np.any(np.isnan(E), axis=1) E = E[~na_rows, :] if weights is not None: weights = weights[~na_rows, :] if E.shape[0] < 2: return w if weights is None: return _array_weights_reml(E, design, var_design, prior_n, maxiter, tol, trace=trace) else: return _array_weights_pr_wts_reml( E, design, weights, var_design, prior_n, maxiter, tol, trace=trace, ) else: raise ValueError(f"Unknown method: {method}")
[docs] def array_weights_quick( y, fit: dict, *, layer: str | None = None, ) -> np.ndarray: """ Compute approximate array quality weights from a linear model fit. Faithful port of R limma's :func:`arrayWeightsQuick`. Computes ``1 / colMeans(res*res / (sigma^2 * (1 - h)))`` where ``h`` is the leverage diagonal of the design's hat matrix and ``res`` is the residual matrix from the fit. Parameters ---------- y : array_like Expression matrix, shape (n_genes, n_samples). fit : dict Linear model fit (from :func:`lm_fit`) carrying ``coefficients``, ``design``, and ``sigma``. If ``fit`` carries observation weights a warning is emitted (matches R: spot quality weights are not taken into account). Returns ------- ndarray Approximate array quality weights, shape (n_samples,). """ y = np.asarray(get_eawp(y, layer=layer)["exprs"], dtype=np.float64) if fit.get("weights") is not None: import warnings warnings.warn( "spot quality weights found but not taken into account", UserWarning, ) coefficients = np.asarray(fit["coefficients"], dtype=np.float64) design = np.asarray(fit["design"], dtype=np.float64) sigma = np.asarray(fit["sigma"], dtype=np.float64) res = y - coefficients @ design.T # Hat-matrix diagonal of the design itself (no intercept augmentation). q, _r, _pivot = linalg.qr(design, pivoting=True, mode="economic") rank = int(np.sum(np.abs(np.diag(_r)) > 1e-10)) q = q[:, :rank] h = np.sum(q**2, axis=1) mures2 = (sigma[:, np.newaxis] ** 2) * (1.0 - h)[np.newaxis, :] return 1.0 / np.nanmean(res * res / mures2, axis=0)
def modify_weights( weights, status, values, multipliers, ) -> np.ndarray: """ Multiply rows of a weights matrix by status-specific scalars. Port of R limma's ``modifyWeights``. For each entry in ``values``, rows whose ``status`` matches that entry are multiplied (in place on a copy) by the corresponding multiplier. Parameters ---------- weights : array_like Weights matrix of shape (n_features, n_arrays). 1-D inputs are promoted to a single-column matrix to mirror R's ``as.matrix(weights)``. status : array_like of str Per-row status labels. Length must equal ``nrow(weights)``. values : sequence of str Status values to act on. multipliers : float or sequence of float Multiplier for each value. A single multiplier is recycled to the length of ``values``. Returns ------- ndarray Updated weights matrix, same shape as the input (1-D inputs come back as a column matrix to match R). """ status = np.asarray([str(s) for s in status]) weights = np.asarray(weights, dtype=np.float64) if weights.ndim == 1: weights = weights.reshape(-1, 1) weights = weights.copy() values = [str(v) for v in values] multipliers = np.atleast_1d(np.asarray(multipliers, dtype=np.float64)) if status.size != weights.shape[0]: raise ValueError("nrows of weights must equal length of status") if multipliers.size == 1: multipliers = np.repeat(multipliers, len(values)) if len(values) != multipliers.size: raise ValueError("no. values doesn't match no. multipliers") for value, mult in zip(values, multipliers): mask = status == value if np.any(mask): weights[mask, :] = mult * weights[mask, :] return weights