Source code for stdpipe.photometry_measure

"""
Aperture photometry measurement routines.

This module contains functions for performing aperture photometry
on detected objects.
"""

import numpy as np
import photutils
import photutils.background
import photutils.aperture
import photutils.centroids
import photutils.psf
from photutils.utils import calc_total_error

# Note: psf and photometry_psf modules imported lazily in functions to avoid circular dependency

# Check if new SEP features are available (version 1.4+)
try:
    import sep

    _HAS_SEP_OPTIMAL = hasattr(sep, 'sum_circle_optimal') and hasattr(sep, 'stats_circann')
except ImportError:
    _HAS_SEP_OPTIMAL = False


def _is_callable_fwhm(fwhm):
    """True if ``fwhm`` is a position-dependent callable (e.g. FWHMMap)."""
    return (
        fwhm is not None
        and callable(fwhm)
        and not isinstance(fwhm, (bool, int, float, np.integer, np.floating))
    )


def _fwhm_scalar(fwhm):
    """Scalar FWHM summary (median for callables via ``float()``).

    Returns ``None`` when ``fwhm`` is missing, non-positive, or not convertible.
    """
    if fwhm is None:
        return None
    try:
        s = float(fwhm)
    except (TypeError, ValueError):
        return None
    if not np.isfinite(s) or s <= 0:
        return None
    return s


def _fwhm_at(fwhm, x, y):
    """Evaluate FWHM at positions ``x``/``y``.

    Returns a scalar for numeric ``fwhm``, an array for callable ``fwhm``,
    and ``None`` when ``fwhm`` is missing or non-positive.
    """
    s = _fwhm_scalar(fwhm)
    if s is None:
        return None
    if _is_callable_fwhm(fwhm):
        return np.asarray(fwhm(x, y), dtype=float)
    return s


def _extract_valid_positions(obj):
    """Extract object positions as plain arrays with validity mask.

    Handles MaskedColumn inputs by filling masked values with NaN.

    Parameters
    ----------
    obj : astropy.table.Table
        Table with 'x' and 'y' columns.

    Returns
    -------
    x_vals : ndarray
        X positions with masked values filled as NaN.
    y_vals : ndarray
        Y positions with masked values filled as NaN.
    valid_pos : ndarray of bool
        True where both x and y are finite.
    """
    x_vals = np.ma.filled(np.asarray(obj['x']), fill_value=np.nan)
    y_vals = np.ma.filled(np.asarray(obj['y']), fill_value=np.nan)
    valid_pos = np.isfinite(x_vals) & np.isfinite(y_vals)
    return x_vals, y_vals, valid_pos


def _prepare_image_and_mask(image, mask):
    """Sanitize image and mask for photometry.

    Parameters
    ----------
    image : ndarray
        Input image array.
    mask : ndarray of bool or None
        Optional boolean mask (True = masked).

    Returns
    -------
    image1 : ndarray
        Float64 copy of the image.
    mask0 : ndarray of bool
        Mask of non-finite pixels.
    mask : ndarray of bool
        Boolean mask array (zeros if input was None).
    """
    image1 = image.astype(np.double)
    mask0 = ~np.isfinite(image1)

    if mask is None:
        mask = np.zeros(image.shape, dtype=bool)
    else:
        mask = np.array(mask).astype(bool)

    return image1, mask0, mask


def _compute_magnitudes_and_filter(obj, sn, keep_negative, log):
    """Compute magnitudes from flux and apply S/N and positivity filters.

    Modifies obj in-place to add 'mag' and 'magerr' columns, then
    optionally returns a filtered subset.

    Parameters
    ----------
    obj : astropy.table.Table
        Table with 'flux' and 'fluxerr' columns.
    sn : float or None
        Minimum S/N ratio. None to skip filtering.
    keep_negative : bool
        If False, discard negative-flux measurements.
    log : callable
        Logging function.

    Returns
    -------
    obj : astropy.table.Table
        Possibly filtered table with 'mag' and 'magerr' columns set.
    """
    for col in ['mag', 'magerr']:
        obj[col] = np.nan

    idx = obj['flux'] > 0
    obj['mag'][idx] = -2.5 * np.log10(obj['flux'][idx])
    obj['magerr'][idx] = 2.5 / np.log(10) * obj['fluxerr'][idx] / obj['flux'][idx]

    if sn is not None and sn > 0:
        log('Filtering out measurements with S/N < %.1f' % sn)
        idx = np.isfinite(obj['magerr'])
        idx[idx] &= obj['magerr'][idx] < 1 / sn
        obj = obj[idx]

    if not keep_negative:
        log('Filtering out measurements with negative fluxes')
        idx = obj['flux'] > 0
        obj = obj[idx]

    return obj


def _get_psf_stamp_at_position(psf, x, y, stamp_size=None):
    """Get normalized PSF stamp at a given position with automatic sub-pixel alignment.

    For Gaussian PSF: Creates pixel-integrated PSF using the error function (erf)
    to properly integrate Gaussian flux over each pixel's area. This eliminates
    the FWHM-dependent systematic bias caused by point sampling at pixel centers:
      - FWHM = 1.5 pix: +11.6% bias -> ~0%
      - FWHM = 3.0 pix: +2.7% bias -> ~0%
      - FWHM = 6.0 pix: +0.8% bias -> ~0%

    The stamp is shifted by the sub-pixel offset (x - round(x), y - round(y))
    to align with the actual source position.

    For PSFEx/ePSF models: The psf.get_psf_stamp() function automatically applies
    the same sub-pixel shift internally, so no additional correction is needed here.
    Both methods produce identically aligned PSF stamps.

    Parameters
    ----------
    psf : dict or float
        PSF model (dict from PSFEx/ePSF, or Gaussian FWHM as float).
    x : float
        Object x position.
    y : float
        Object y position.
    stamp_size : int or None
        Optional fixed stamp size (odd integer).

    Returns
    -------
    psf_stamp : ndarray
        Normalized PSF stamp aligned to sub-pixel position (x, y).

    Notes
    -----
    Technical details of pixel-integrated Gaussian:

    - Uses erf (error function = CDF of Gaussian) to integrate over pixel boundaries
    - Each pixel spans from (i-0.5, i+0.5) in both x and y
    - Flux in pixel (i,j) = integral of Gaussian(x,y) dx dy over pixel area
    - This is computed as: [CDF(x+0.5) - CDF(x-0.5)] * [CDF(y+0.5) - CDF(y-0.5)]
    """
    if isinstance(psf, dict):
        # PSFEx or ePSF model - lazy import to avoid circular dependency
        from stdpipe import psf as psf_module

        psf_stamp = psf_module.get_psf_stamp(psf, x=x, y=y, normalize=True)
    else:
        # Gaussian PSF - create stamp from FWHM with sub-pixel shift
        # Use pixel integration to eliminate FWHM-dependent systematic bias
        from scipy.special import erf

        fwhm = float(psf)
        sigma = fwhm / (2 * np.sqrt(2 * np.log(2)))
        size = stamp_size if stamp_size else int(np.ceil(fwhm * 3)) * 2 + 1

        # CRITICAL: Shift PSF by sub-pixel offset to align with actual source position
        # This eliminates position-dependent biases:
        #   - Centroiding bias: ~0.2 pix → ~0.01 pix (20× improvement)
        #   - Optimal extraction flux bias from sub-pixel position: eliminated
        ix, iy = int(np.round(x)), int(np.round(y))
        dx_sub = x - ix  # Sub-pixel offset (e.g., 0.3 if x=25.3)
        dy_sub = y - iy

        # PSF center in stamp coordinates (accounting for sub-pixel shift)
        x0 = size // 2 + dx_sub
        y0 = size // 2 + dy_sub

        # Pixel edges: pixels span from (i-0.5) to (i+0.5)
        x_edges = np.arange(size + 1) - 0.5
        y_edges = np.arange(size + 1) - 0.5

        # Integrate Gaussian over pixel boundaries using error function
        # erf(z) = (2/√π) ∫₀^z exp(-t²) dt is the CDF of standard normal
        # CDF of Gaussian N(μ,σ²) at point x is: Φ(x) = 0.5 * [1 + erf((x-μ)/(σ√2))]
        sqrt2_sigma = np.sqrt(2) * sigma
        cdf_x = 0.5 * (1 + erf((x_edges - x0) / sqrt2_sigma))
        cdf_y = 0.5 * (1 + erf((y_edges - y0) / sqrt2_sigma))

        # Flux in each pixel = CDF at right edge - CDF at left edge
        # For pixel i: flux = CDF(i+0.5) - CDF(i-0.5)
        flux_x = np.diff(cdf_x)  # Integrated flux in x direction
        flux_y = np.diff(cdf_y)  # Integrated flux in y direction

        # 2D PSF is separable: outer product of 1D integrated profiles
        psf_stamp = np.outer(flux_y, flux_x)

        # Normalize to unit sum
        psf_stamp /= np.sum(psf_stamp)

    return psf_stamp


def _psf_centroid(image, x, y, psf, mask=None, box_size=None, maxiter=16, tol=1e-4):
    """Compute PSF-weighted centroid at a given position.

    Uses PSF weighting analogous to optimal extraction (Naylor 1998),
    which naturally emphasizes high-S/N pixels in the PSF core.
    This provides better stability and accuracy than simple center-of-mass,
    especially for crowded fields and faint sources.

    Iterates internally until convergence (position shift < tol) or maxiter
    is reached. Due to the I*P weighting, each iteration only recovers half
    the remaining offset, so ~10-16 iterations are needed for sub-0.01 pix
    accuracy from a cold start.

    Parameters
    ----------
    image : ndarray
        Background-subtracted image.
    x : float
        Initial x position estimate.
    y : float
        Initial y position estimate.
    psf : dict or float
        PSF model (dict from PSFEx/ePSF, or Gaussian FWHM as float).
    mask : ndarray of bool or None
        Optional mask (True = masked).
    box_size : int or None
        Minimum cutout size in pixels (actual size is max of this and PSF stamp size).
    maxiter : int
        Maximum number of internal iterations.
    tol : float
        Convergence tolerance in pixels.

    Returns
    -------
    x_new : float
        Refined x position, or np.nan on failure.
    y_new : float
        Refined y position, or np.nan on failure.
    """

    # Determine box_size from PSF stamp size on the first call
    psf_stamp = _get_psf_stamp_at_position(psf, x, y)
    stamp_size = psf_stamp.shape[0]

    if box_size is None:
        box_size = stamp_size
    else:
        box_size = max(box_size, stamp_size)
        if box_size % 2 == 0:
            box_size += 1

    half = box_size // 2

    for iteration in range(maxiter):
        # Integer position for cutout center
        ix, iy = int(np.round(x)), int(np.round(y))

        # Extract cutout
        y0, y1 = iy - half, iy + half + 1
        x0, x1 = ix - half, ix + half + 1

        if y0 < 0 or x0 < 0 or y1 > image.shape[0] or x1 > image.shape[1]:
            return np.nan, np.nan

        data_cutout = image[y0:y1, x0:x1]

        if mask is not None:
            mask_cutout = mask[y0:y1, x0:x1]
            good = ~mask_cutout & np.isfinite(data_cutout) & (data_cutout > 0)
        else:
            good = np.isfinite(data_cutout) & (data_cutout > 0)

        if np.sum(good) < 3:
            return np.nan, np.nan

        # Get PSF stamp at current position (shifted by sub-pixel offset)
        psf_stamp = _get_psf_stamp_at_position(psf, x, y)

        # Embed PSF stamp in box-sized array if needed
        if psf_stamp.shape[0] != box_size:
            psf_resized = np.zeros((box_size, box_size))
            psf_half = psf_stamp.shape[0] // 2
            box_half = box_size // 2
            y_start = box_half - psf_half
            x_start = box_half - psf_half
            psf_resized[
                y_start : y_start + psf_stamp.shape[0], x_start : x_start + psf_stamp.shape[1]
            ] = psf_stamp
            psf_stamp = psf_resized

        # Coordinate grids in image coordinates
        yy, xx = np.mgrid[y0:y1, x0:x1]

        I = data_cutout[good]
        P = psf_stamp[good]

        # Offsets from current floating-point position
        x_off = xx[good].astype(float) - x
        y_off = yy[good].astype(float) - y

        sum_IP = np.sum(I * P)
        if sum_IP <= 0:
            return np.nan, np.nan

        dx = np.sum(x_off * I * P) / sum_IP
        dy = np.sum(y_off * I * P) / sum_IP

        x += dx
        y += dy

        if dx * dx + dy * dy < tol * tol:
            break

    return x, y


def _solve_weighted_leastsq(A, D, W):
    """Solve weighted least squares: (A^T W A)x = A^T W D.

    Parameters
    ----------
    A : ndarray
        Design matrix, shape (npix, K) where K is number of sources.
    D : ndarray
        Data vector, shape (npix,).
    W : ndarray
        Weight vector, shape (npix,), inverse variance weights.

    Returns
    -------
    x : ndarray or None
        Solution vector, or None on failure.
    cov : ndarray or None
        Covariance matrix, or None on failure.
    """
    # A^T W A and A^T W D using weight vector
    # Near-singular systems (e.g. overlapping sources) can produce
    # non-finite intermediates — catch and return failure gracefully
    with np.errstate(divide='ignore', over='ignore', invalid='ignore'):
        AtW = A.T * W  # Broadcasting: (K, npix) * (npix,) -> (K, npix)
        AtWA = AtW @ A  # (K, K)
        AtWD = AtW @ D  # (K,)

    if not np.all(np.isfinite(AtWA)) or not np.all(np.isfinite(AtWD)):
        return None, None

    # Solve and get covariance
    try:
        x = np.linalg.solve(AtWA, AtWD)
        if not np.all(np.isfinite(x)):
            return None, None
        cov = np.linalg.solve(AtWA, np.eye(AtWA.shape[0]))
    except np.linalg.LinAlgError:
        return None, None

    return x, cov


def _grouped_optimal_extraction(image, err, positions, psf, bg_local=None, mask=None, radius=None):
    """Perform grouped optimal extraction for multiple overlapping sources.

    Solves D = sum_k F_k * P_k + B via weighted least squares (A^T W A)x = A^T W D.
    Simultaneously fits the fluxes of all sources in a group, properly
    accounting for flux sharing between overlapping PSFs.

    Parameters
    ----------
    image : ndarray
        Background-subtracted image.
    err : ndarray
        Error/noise map (sqrt of variance).
    positions : list of tuple
        List of (x, y) positions for sources in group.
    psf : dict or float
        PSF model (dict from PSFEx/ePSF, or Gaussian FWHM as float).
    bg_local : list, ndarray, float, or None
        Local background values for each source, or single value for all.
    mask : ndarray of bool or None
        Optional mask (True = masked).
    radius : float or None
        Extraction radius in pixels around each source.

    Returns
    -------
    results : list of tuple
        List of (flux, fluxerr, npix, norm, reduced_chi2) for each source.
    """
    K = len(positions)
    if K == 0:
        return []

    # For single source, use standard optimal extraction
    if K == 1:
        x, y = positions[0]
        bg = bg_local[0] if isinstance(bg_local, (list, np.ndarray)) else bg_local
        result = _optimal_extraction(image, err, x, y, psf, bg_local=bg, mask=mask, radius=radius)
        return [result]

    # Get PSF stamp size from first position
    test_stamp = _get_psf_stamp_at_position(psf, positions[0][0], positions[0][1])
    stamp_size = test_stamp.shape[0]
    half = stamp_size // 2

    # Determine bounding box covering all sources + PSF radius
    xs = np.array([p[0] for p in positions])
    ys = np.array([p[1] for p in positions])

    # Integer bounds with padding for PSF
    x0 = int(np.floor(np.min(xs))) - half
    x1 = int(np.ceil(np.max(xs))) + half + 1
    y0 = int(np.floor(np.min(ys))) - half
    y1 = int(np.ceil(np.max(ys))) + half + 1

    # Handle image boundaries
    if x0 < 0 or y0 < 0 or x1 > image.shape[1] or y1 > image.shape[0]:
        # Fall back to individual fitting for edge groups
        results = []
        for i, (x, y) in enumerate(positions):
            bg = bg_local[i] if isinstance(bg_local, (list, np.ndarray)) else bg_local
            results.append(
                _optimal_extraction(image, err, x, y, psf, bg_local=bg, mask=mask, radius=radius)
            )
        return results

    # Extract common cutout
    data_cutout = image[y0:y1, x0:x1].copy()
    err_cutout = err[y0:y1, x0:x1]
    cutout_shape = data_cutout.shape

    # Build mask for good pixels
    if mask is not None:
        mask_cutout = mask[y0:y1, x0:x1]
        good = ~mask_cutout & (err_cutout > 0) & np.isfinite(data_cutout)
    else:
        good = (err_cutout > 0) & np.isfinite(data_cutout)

    # Apply radius cutoff around each source if specified
    if radius is not None:
        yy, xx = np.mgrid[0 : cutout_shape[0], 0 : cutout_shape[1]]
        radius_mask = np.zeros(cutout_shape, dtype=bool)
        for x, y in positions:
            # Position relative to cutout
            rel_x = x - x0
            rel_y = y - y0
            dist = np.sqrt((xx - rel_x) ** 2 + (yy - rel_y) ** 2)
            radius_mask |= dist <= radius
        good &= radius_mask

    n_pix = int(np.sum(good))
    if n_pix == 0:
        return [(np.nan, np.nan, 0, np.nan, np.nan) for _ in range(K)]

    # Subtract average local background to center data (fit includes residual background term)
    bg_offset = 0.0
    if bg_local is not None:
        if isinstance(bg_local, (list, np.ndarray)):
            bg_offset = np.nanmean(bg_local)
        else:
            bg_offset = bg_local
        if np.isfinite(bg_offset) and bg_offset != 0:
            data_cutout -= bg_offset

    # Build design matrix with K PSF columns (+ optional background term)
    # Only fit background if bg_local was provided (i.e., we subtracted source-specific backgrounds)
    fit_background = bg_local is not None and bg_offset != 0
    n_params = K + 1 if fit_background else K
    A = np.zeros((n_pix, n_params))
    psf_norms = np.zeros(K)

    for k, (x, y) in enumerate(positions):
        # Get PSF at this source position
        psf_stamp = _get_psf_stamp_at_position(psf, x, y, stamp_size)

        # Position of PSF center relative to cutout origin
        rel_x = x - x0
        rel_y = y - y0

        # Integer position and sub-pixel offset
        ix_rel = int(np.round(rel_x))
        iy_rel = int(np.round(rel_y))

        # PSF stamp placement bounds in cutout
        p_y0 = iy_rel - half
        p_y1 = iy_rel + half + 1
        p_x0 = ix_rel - half
        p_x1 = ix_rel + half + 1

        # Create full-size PSF array for this source
        psf_full = np.zeros(cutout_shape)

        # Handle partial overlap at cutout edges
        # Source region in cutout
        c_y0 = max(0, p_y0)
        c_y1 = min(cutout_shape[0], p_y1)
        c_x0 = max(0, p_x0)
        c_x1 = min(cutout_shape[1], p_x1)

        # Corresponding region in PSF stamp
        s_y0 = c_y0 - p_y0
        s_y1 = stamp_size - (p_y1 - c_y1)
        s_x0 = c_x0 - p_x0
        s_x1 = stamp_size - (p_x1 - c_x1)

        if c_y1 > c_y0 and c_x1 > c_x0:
            psf_full[c_y0:c_y1, c_x0:c_x1] = psf_stamp[s_y0:s_y1, s_x0:s_x1]

        A[:, k] = psf_full[good]
        psf_norms[k] = np.sum(psf_full[good])

    # Common background term (only if we subtracted varying local backgrounds)
    if fit_background:
        A[:, -1] = 1.0

    # Data and weights
    D = data_cutout[good]
    V = err_cutout[good] ** 2

    # Use simplified weighting (Naylor 1998) to match single-source behavior:
    # Instead of full variance weighting W=1/V, use W=1 (unweighted least squares)
    # This makes grouped and single-source give identical results
    W = np.ones_like(V)

    # Solve unweighted least squares (for flux consistency with single-source)
    x_sol, cov = _solve_weighted_leastsq(A, D, W)

    # Scale covariance by variance for error estimation
    # (flux errors still use variance properly)
    if x_sol is not None and cov is not None:
        # Adjust covariance to account for actual data variance
        # For proper error estimation: scale cov by mean variance in aperture
        mean_var = np.mean(V)
        cov = cov * mean_var

    if x_sol is None:
        # Matrix singularity - fall back to individual fitting
        results = []
        for i, (x, y) in enumerate(positions):
            bg = bg_local[i] if isinstance(bg_local, (list, np.ndarray)) else bg_local
            results.append(
                _optimal_extraction(image, err, x, y, psf, bg_local=bg, mask=mask, radius=radius)
            )
        return results

    # Extract fluxes and errors
    fluxes = x_sol[:K]
    flux_errors = np.sqrt(np.diag(cov)[:K])

    # Compute chi-squared for the group fit
    # Use variance weighting for chi2 (not W, which is 1 for unweighted fit)
    # This gives the proper goodness-of-fit metric: χ² = Σ((data - model)² / variance)
    with np.errstate(divide='ignore', over='ignore', invalid='ignore'):
        model = A @ x_sol
        residuals = D - model
        chi2 = np.sum(residuals**2 / V)

    # Reduced chi-squared (degrees of freedom = N - n_params)
    dof = n_pix - n_params
    if dof > 0:
        reduced_chi2 = chi2 / dof
    else:
        reduced_chi2 = np.nan

    # Build results for each source
    results = []
    for k in range(K):
        results.append((fluxes[k], flux_errors[k], n_pix, psf_norms[k], reduced_chi2))

    return results


def _optimal_extraction(image, err, x, y, psf, bg_local=None, mask=None, radius=None):
    """Perform optimal extraction photometry at a single position.

    Algorithm from Naylor (1998, MNRAS 296, 339) based on Horne (1986),
    using simplified weighting for robustness to PSF errors.

    flux = sum(P * D) / sum(P^2)
    variance = sum(P^2 * V) / (sum(P^2))^2

    Parameters
    ----------
    image : ndarray
        Background-subtracted image.
    err : ndarray
        Error/noise map (sqrt of variance).
    x : float
        Object x position.
    y : float
        Object y position.
    psf : dict or float
        PSF model (dict from PSFEx/ePSF, or Gaussian FWHM as float).
    bg_local : float or None
        Optional local background value to be additionally subtracted
        prior to measurement.
    mask : ndarray of bool or None
        Optional mask (True = masked).
    radius : float or None
        Extraction radius in pixels (default: use full PSF stamp).

    Returns
    -------
    flux : float
        Extracted flux.
    fluxerr : float
        Flux uncertainty.
    npix : int
        Number of pixels used.
    norm : float
        PSF normalization factor.
    reduced_chi2 : float
        Reduced chi-squared of the fit.
    """
    # Get PSF stamp at object position (automatically includes sub-pixel shift)
    psf_stamp = _get_psf_stamp_at_position(psf, x, y)

    # Determine extraction region
    stamp_size = psf_stamp.shape[0]
    half = stamp_size // 2

    # Integer position for cutout
    ix, iy = int(np.round(x)), int(np.round(y))

    # Extract cutouts
    y0, y1 = iy - half, iy + half + 1
    x0, x1 = ix - half, ix + half + 1

    # Handle edge cases
    if y0 < 0 or x0 < 0 or y1 >= image.shape[0] or x1 >= image.shape[1]:
        return np.nan, np.nan, 0, np.nan, np.nan

    data_cutout = image[y0:y1, x0:x1]
    err_cutout = err[y0:y1, x0:x1]

    # Apply mask if provided
    if mask is not None:
        mask_cutout = mask[y0:y1, x0:x1]
        good = ~mask_cutout & (err_cutout > 0) & np.isfinite(data_cutout)
    else:
        good = (err_cutout > 0) & np.isfinite(data_cutout)

    # Apply radius cutoff if specified
    if radius is not None:
        yy, xx = np.mgrid[0:stamp_size, 0:stamp_size]
        dist = np.sqrt((xx - half) ** 2 + (yy - half) ** 2)
        good &= dist <= radius

    if np.sum(good) == 0:
        return np.nan, np.nan, 0, np.nan, np.nan

    # Variance = err^2
    var = err_cutout**2

    # Optimal extraction with simplified weighting (Naylor 1998)
    P = psf_stamp[good]
    D = data_cutout[good]
    V = var[good]

    if bg_local:
        D -= bg_local

    # Simplified weighting: flux = Σ(P × D) / Σ(P²)
    sum_P2 = np.sum(P**2)

    if sum_P2 <= 0:
        return np.nan, np.nan, 0, np.nan, np.nan

    flux = np.sum(P * D) / sum_P2

    # variance = Σ(P² × V) / (Σ(P²))²
    variance = np.sum(P**2 * V) / (sum_P2**2)
    fluxerr = np.sqrt(variance)

    # Compute chi-squared: χ² = Σ((D - flux × P)² / V)
    residuals = D - flux * P
    chi2 = np.sum(residuals**2 / V)

    # Reduced chi-squared (degrees of freedom = N - 1)
    n_pix = int(np.sum(good))
    if n_pix > 1:
        reduced_chi2 = chi2 / (n_pix - 1)
    else:
        reduced_chi2 = np.nan

    return flux, fluxerr, n_pix, np.sum(P), reduced_chi2


[docs] def measure_objects( obj, image, aper=3, bkgann=None, bkg_order=0, fwhm=None, psf=None, optimal=False, group_sources=True, grouper_radius=None, mask=None, bg=None, err=None, gain=None, bg_size=64, sn=None, centroid_iter=0, centroid_method='com', keep_negative=True, get_bg=False, verbose=False, ): """Photometry at the positions of already detected objects. Supports both standard aperture photometry and optimal extraction that provides ~10% S/N improvement for point sources (Naylor 1998). It will estimate and subtract the background unless external background estimation (``bg``) is provided, and use user-provided noise map (``err``) if requested. The results may optionally be filtered to drop detections with low signal to noise ratio if ``sn`` is set and positive. It will also filter out events with negative flux unless ``keep_negative`` is True. Parameters ---------- obj : astropy.table.Table Table with initial object detections to be measured. image : ndarray Input image as a NumPy array. aper : float Circular aperture radius in pixels for flux measurement. For optimal extraction, this is the clipping radius. bkgann : tuple of float or None Background annulus (inner, outer radii) for local background estimation. If None, global background model is used instead. bkg_order : int Polynomial order for local background fitting. 0 = constant (mean), 1 = plane (linear gradient, recommended), 2 = quadratic surface. Only used if ``bkgann`` is set. fwhm : float, callable, or None If provided, ``aper`` and ``bkgann`` are measured in units of FWHM. Also used to define Gaussian PSF for optimal extraction if ``psf`` is not provided. A callable (e.g. a :class:`stdpipe.photometry.FWHMMap`) is accepted and evaluated at each source position for the optimal-extraction PSF width; aperture/bkgann scaling, PSF-weighted centroiding, and local background annuli still use the scalar summary ``float(fwhm)``. psf : dict or None PSF model for optimal extraction and PSF-weighted centroiding. Can be a dict from ``psf.run_psfex()``, ``psf.load_psf()``, or ``psf.create_psf_model()``. If None, a Gaussian PSF is created from the ``fwhm`` parameter. optimal : bool If True, use optimal extraction instead of aperture photometry. Requires either ``psf`` or ``fwhm`` to define the PSF profile. group_sources : bool If True and ``optimal=True``, use grouped optimal extraction for overlapping sources. Fits nearby sources simultaneously for better accuracy in crowded fields. grouper_radius : float or None Radius in pixels for grouping nearby sources. Sources within this distance are fitted simultaneously. If None, defaults to 2*aper. Only used if ``group_sources=True``. mask : ndarray of bool or None Image mask (True values are masked). bg : ndarray or None If provided, use this background instead of automatically computed one. Must have the same shape as input image. err : ndarray or None Image noise map to use instead of automatically computed one. gain : float or None Image gain in e-/ADU, used to build image noise model. bg_size : int Background grid size in pixels. sn : float or None Minimal S/N ratio. If set, measurements with magnitude errors exceeding 1/sn will be discarded. centroid_iter : int Number of centroiding iterations to run before photometry. centroid_method : str Centroiding method: 'com' (center-of-mass) or 'psf' (PSF-weighted). PSF-weighted is more accurate but degrades with heavy random masking (>20%). Requires ``psf`` or ``fwhm`` parameter. keep_negative : bool If False, measurements with negative fluxes will be discarded. get_bg : bool If True, also return estimated background and background noise images. verbose : bool or callable Whether to show verbose messages. May be boolean or a print-like function. Returns ------- obj : astropy.table.Table Copy of original table with ``flux``, ``fluxerr``, ``mag`` and ``magerr`` columns replaced with measured values. bg_image : ndarray Background image (only returned if ``get_bg=True``). err_image : ndarray Error image (only returned if ``get_bg=True``). Notes ----- Quality flags set in the ``flags`` column: - 0x200: At least one aperture pixel is masked - 0x400: Invalid position or local background estimation failed - 0x800: Optimal extraction failed (NaN result) - 0x1000: Poor fit quality (chi2 > 1000, numerical instability) """ # Simple wrapper around print for logging in verbose mode only log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None if not len(obj): log('No objects to measure') return obj # Operate on the copy of the list obj = obj.copy() image1, mask0, mask = _prepare_image_and_mask(image, mask) if bg is None or err is None or get_bg: log('Estimating global background with %dx%d grid' % (bg_size, bg_size)) bg_est = photutils.background.Background2D( image1, bg_size, mask=mask | mask0, exclude_percentile=90 ) bg_est_bg = bg_est.background bg_est_rms = bg_est.background_rms else: bg_est = None if bg is None: log( 'Subtracting global background: median %.1f rms %.2f' % (np.median(bg_est_bg), np.std(bg_est_bg)) ) image1 -= bg_est_bg else: log( 'Subtracting user-provided background: median %.1f rms %.2f' % (np.median(bg), np.std(bg)) ) image1 -= bg image1[mask0] = 0 if err is None: log( 'Using global background noise map: median %.1f rms %.2f + gain %.1f' % ( np.median(bg_est_rms), np.std(bg_est_rms), gain if gain else np.inf, ) ) err = bg_est_rms if gain: err = calc_total_error(image1, err, gain) else: log('Using user-provided noise map: median %.1f rms %.2f' % (np.median(err), np.std(err))) fwhm_s = _fwhm_scalar(fwhm) fwhm_is_callable = _is_callable_fwhm(fwhm) if fwhm_s is not None: if fwhm_is_callable: log('Scaling aperture radii with spatial FWHM (median %.1f pix)' % fwhm_s) else: log('Scaling aperture radii with FWHM %.1f pix' % fwhm_s) aper = aper * fwhm_s log('Using aperture radius %.1f pixels' % aper) # Centroiding if centroid_iter: box_size = int(np.ceil(aper)) if box_size % 2 == 0: box_size += 1 # Determine centroiding method use_psf_centroid = centroid_method == 'psf' and (psf is not None or fwhm_s is not None) if use_psf_centroid: # Prepare PSF for centroiding if psf is not None: psf_for_centroid = psf log( 'Using PSF-weighted centroiding with %d iterations within %dx%d box' % (centroid_iter, box_size, box_size) ) else: # PSF-weighted centroiding uses a scalar FWHM; for a spatial # model, the scalar summary (median) is a sensible proxy. psf_for_centroid = fwhm_s log( 'Using PSF-weighted centroiding (Gaussian FWHM=%.1f) with %d iterations within %dx%d box' % (fwhm_s, centroid_iter, box_size, box_size) ) else: log( 'Using COM centroiding with %d iterations within %dx%d box' % (centroid_iter, box_size, box_size) ) # Keep original pixel positions obj['x_orig'] = np.array(obj['x']) obj['y_orig'] = np.array(obj['y']) # Combined mask for centroiding centroid_mask = mask | mask0 # Get plain arrays to avoid MaskedColumn warnings xs = np.array(obj['x'], dtype=float) ys = np.array(obj['y'], dtype=float) # Process each object individually for i in range(len(obj)): x, y = xs[i], ys[i] # Skip invalid positions (NaN or outside image with margin for box) if not np.isfinite(x) or not np.isfinite(y): continue if x < box_size // 2 or x >= image1.shape[1] - box_size // 2: continue if y < box_size // 2 or y >= image1.shape[0] - box_size // 2: continue # Iterative centroiding for _ in range(centroid_iter): if use_psf_centroid: # PSF-weighted centroiding x_new, y_new = _psf_centroid( image1, x, y, psf_for_centroid, mask=centroid_mask, box_size=box_size ) else: # Standard COM centroiding # Extract cutout around current position x_int, y_int = int(round(x)), int(round(y)) x_min = x_int - box_size // 2 x_max = x_int + box_size // 2 + 1 y_min = y_int - box_size // 2 y_max = y_int + box_size // 2 + 1 # Check bounds (position may have shifted during iteration) if x_min < 0 or y_min < 0 or x_max > image1.shape[1] or y_max > image1.shape[0]: break cutout = image1[y_min:y_max, x_min:x_max] # Let's only use positive pixels for centroiding cutout_mask = centroid_mask[y_min:y_max, x_min:x_max] | (cutout < 0) # Skip if footprint is fully masked if np.all(cutout_mask): break # Compute centroid in cutout coordinates x_c, y_c = photutils.centroids.centroid_com(cutout, mask=cutout_mask) # Skip if centroid computation failed if not np.isfinite(x_c) or not np.isfinite(y_c): break # Convert back to image coordinates x_new = x_min + x_c y_new = y_min + y_c # Check convergence if np.isfinite(x_new) and np.isfinite(y_new): shift = np.sqrt((x_new - x) ** 2 + (y_new - y) ** 2) x, y = x_new, y_new if shift < 0.01: # Converged (0.01 pixel threshold) break else: # Centroid computation failed, keep previous position break # Update object position obj['x'][i] = x obj['y'][i] = y if 'flags' not in obj.keys(): obj['flags'] = 0 # Dedicated column for local background on top of global estimation obj['bg_local'] = 0 # Local background - it has to be computed prior to optimal photometry # as otherwise it is too difficult to subtract it from its (weighted) results if bkgann is not None and len(bkgann) == 2: if fwhm_s is not None: # LocalBackground/GradientLocalBackground both take scalar radii, # so we use the median of the spatial FWHM here. bkgann = [_ * fwhm_s for _ in bkgann] order_names = {0: 'constant (mean)', 1: 'plane', 2: 'quadratic'} order_name = order_names.get(bkg_order, f'order {bkg_order}') log( 'Using local background annulus between %.1f and %.1f pixels with %s fitting' % (bkgann[0], bkgann[1], order_name) ) x_vals, y_vals, valid_pos = _extract_valid_positions(obj) # Initialize bg_local with zeros obj['bg_local'] = 0.0 # Only compute local background for valid positions if np.any(valid_pos): if bkg_order == 0: # Use photutils LocalBackground for order=0 (performance) lbg = photutils.background.LocalBackground( bkgann[0], bkgann[1], bkg_estimator=photutils.background.ModeEstimatorBackground(), ) obj['bg_local'][valid_pos] = lbg( image1, x_vals[valid_pos], y_vals[valid_pos], mask=mask | mask0 ) else: # Use gradient-aware fitting for order > 0 # GradientLocalBackground handles array inputs efficiently # Lazy import to avoid circular dependency from .photometry_psf import GradientLocalBackground grad_bkg = GradientLocalBackground(bkgann[0], bkgann[1], order=bkg_order) obj['bg_local'][valid_pos] = grad_bkg( image1, x_vals[valid_pos], y_vals[valid_pos], mask=mask | mask0 ) # Flag invalid positions and positions where local bg estimation failed idx = ~valid_pos | ~np.isfinite(obj['bg_local']) obj['flags'][idx] |= 0x400 obj['bg_local'][idx] = 0 # Photometric apertures x_vals, y_vals, valid_pos = _extract_valid_positions(obj) valid_idx = np.where(valid_pos)[0] apertures = None obj['npix_aper'] = 0.0 obj['bg_fluxerr'] = 0.0 # Local background flux error inside the aperture if np.any(valid_pos): positions = list(zip(x_vals[valid_pos], y_vals[valid_pos])) apertures = photutils.aperture.CircularAperture(positions, r=aper) # Check whether some aperture pixels are masked, and set the flags for that mres = photutils.aperture.aperture_photometry(mask | mask0, apertures, method='center') obj['flags'][valid_idx[mres['aperture_sum'] > 0]] |= 0x200 # Aperture unmasked areas, in (fractional) pixels image_ones = np.ones(image1.shape) res_area = photutils.aperture.aperture_photometry(image_ones, apertures, mask=mask0) obj['npix_aper'][valid_idx] = res_area['aperture_sum'] # Position-dependent background flux error from global background model, if available if bg_est is not None: res = photutils.aperture.aperture_photometry(bg_est_rms**2, apertures) obj['bg_fluxerr'][valid_idx] = np.sqrt(res['aperture_sum']) if optimal: # Optimal extraction photometry (Naylor 1998) log('Using optimal extraction photometry') # Determine PSF model. A callable ``fwhm`` (e.g. FWHMMap) yields a # per-source Gaussian width in the ungrouped path below. if psf is not None: psf_for_extraction = psf log('Using provided PSF model for optimal extraction') elif fwhm_s is not None: psf_for_extraction = fwhm if fwhm_is_callable else fwhm_s if fwhm_is_callable: log('Using spatial Gaussian PSF for optimal extraction (median FWHM=%.1f)' % fwhm_s) else: log('Using Gaussian PSF with FWHM=%.1f for optimal extraction' % fwhm_s) else: raise ValueError("Either 'psf' or 'fwhm' must be provided for optimal extraction") # Initialize output columns obj['flux'] = np.nan obj['fluxerr'] = np.nan obj['npix_optimal'] = 0 obj['chi2_optimal'] = np.nan obj['norm_optimal'] = np.nan if group_sources: # Grouped optimal extraction for crowded fields if grouper_radius is None: grouper_radius = 2 * aper log('Using grouped optimal extraction with grouper radius %.1f pixels' % grouper_radius) # Add group columns obj['group_id'] = -1 obj['group_size'] = 0 # Use SourceGrouper to identify groups grouper = photutils.psf.SourceGrouper(min_separation=grouper_radius) x_vals, y_vals, valid_pos = _extract_valid_positions(obj) # Get group IDs only for valid positions, then map back if np.any(valid_pos): group_ids_valid = grouper(x_vals[valid_pos], y_vals[valid_pos]) # Map group IDs back to full array (-1 for invalid positions) group_ids = np.full(len(obj), -1, dtype=int) group_ids[valid_pos] = group_ids_valid else: group_ids = np.full(len(obj), -1, dtype=int) obj['group_id'] = group_ids # Process each group (skip -1 which are invalid/masked positions) unique_groups = np.unique(group_ids) for gid in unique_groups: if gid < 0: continue # Skip invalid positions group group_mask = group_ids == gid group_indices = np.where(group_mask)[0] group_size = len(group_indices) # Update group_size for all members obj['group_size'][group_mask] = group_size # Get positions and backgrounds for this group positions = [ (obj['x'][i], obj['y'][i]) for i in group_indices if np.isfinite(obj['x'][i]) and np.isfinite(obj['y'][i]) ] bg_locals = [ obj['bg_local'][i] for i in group_indices if np.isfinite(obj['x'][i]) and np.isfinite(obj['y'][i]) ] if len(positions) == 0: continue # Perform grouped extraction. The grouped helper shares a # single PSF across the group, so for a spatial FWHM model # we evaluate it at the group mean and pass the scalar. if fwhm_is_callable and psf is None: xs_g = np.array([p[0] for p in positions]) ys_g = np.array([p[1] for p in positions]) grp_fwhm = float(np.median(_fwhm_at(fwhm, xs_g, ys_g))) psf_for_group = grp_fwhm else: psf_for_group = psf_for_extraction results = _grouped_optimal_extraction( image1, err, positions, psf_for_group, bg_local=bg_locals, mask=mask | mask0, radius=aper, ) # Store results valid_idx = 0 for i in group_indices: if np.isfinite(obj['x'][i]) and np.isfinite(obj['y'][i]): res = results[valid_idx] obj['flux'][i] = res[0] obj['fluxerr'][i] = res[1] obj['npix_optimal'][i] = res[2] obj['norm_optimal'][i] = res[3] obj['chi2_optimal'][i] = res[4] valid_idx += 1 log( 'Processed %d groups (%d isolated, %d grouped)' % ( len(unique_groups), np.sum(obj['group_size'] == 1), np.sum(obj['group_size'] > 1), ) ) else: # Standard single-source optimal extraction for i, o in enumerate(obj): if np.isfinite(o['x']) and np.isfinite(o['y']): if fwhm_is_callable and psf is None: psf_here = float(fwhm(o['x'], o['y'])) else: psf_here = psf_for_extraction res = _optimal_extraction( image1, err, o['x'], o['y'], psf_here, bg_local=o['bg_local'], mask=mask | mask0, # Do not count the flux from 'soft-masked' pixels radius=aper, # Use aperture radius as clipping radius ) ( o['flux'], o['fluxerr'], o['npix_optimal'], o['norm_optimal'], o['chi2_optimal'], ) = res # Flag objects where optimal extraction failed obj['flags'][~np.isfinite(obj['flux'])] |= 0x800 # Flag objects with poor fits (chi2 > 1000 indicates numerical instability or crowding) # These typically occur in very crowded groups where the matrix becomes ill-conditioned bad_chi2 = np.isfinite(obj['chi2_optimal']) & (obj['chi2_optimal'] > 1000) obj['flags'][bad_chi2] |= 0x1000 else: # Standard aperture photometry # Use just a minimal mask here so that the flux from 'soft-masked' (e.g. saturated) pixels is still counted obj['flux'] = np.nan obj['fluxerr'] = np.nan if apertures is not None: res = photutils.aperture.aperture_photometry(image1, apertures, error=err, mask=mask0) obj['flux'][valid_idx] = res['aperture_sum'] obj['fluxerr'][valid_idx] = res['aperture_sum_err'] # Subtract local background obj['flux'][valid_pos] -= obj['bg_local'][valid_pos] * obj['npix_aper'][valid_pos] obj = _compute_magnitudes_and_filter(obj, sn, keep_negative, log) if get_bg: return obj, bg_est_bg, err else: return obj
def _get_sep_psf(psf, fwhm, log): """Convert a PSF model to sep.PSF object for use with sep.psf_fit(). Accepts sep.PSF objects (returned as-is) or PSFEx dicts from ``stdpipe.psf.run_psfex()`` / ``load_psf()``. Parameters ---------- psf : sep.PSF or dict PSF model (sep.PSF or PSFEx dict). fwhm : float FWHM in pixels (unused, reserved for future use). log : callable Logging function. Returns ------- sep_psf : sep.PSF SEP PSF object. """ import sep if isinstance(psf, sep.PSF): log('Using provided sep.PSF model (FWHM=%.2f)' % psf.fwhm) return psf if isinstance(psf, dict) and 'data' in psf and 'sampling' in psf: # PSFEx-like dict structure from run_psfex / load_psf log( 'Converting PSFEx model to sep.PSF (FWHM=%.2f, sampling=%.3f, degree=%d)' % (psf['fwhm'], psf['sampling'], psf.get('degree', 0)) ) sep_psf = sep.PSF( psf['data'], sampling=psf['sampling'], degree=psf.get('degree', 0), x0=psf.get('x0', 0), y0=psf.get('y0', 0), sx=psf.get('sx', 1), sy=psf.get('sy', 1), fwhm=psf['fwhm'], ) return sep_psf raise TypeError("Unsupported PSF type: %s. Expected sep.PSF or PSFEx dict." % type(psf))
[docs] def measure_objects_sep( obj, image, aper=3, bkgann=None, fwhm=None, psf=None, optimal=False, group_sources=True, group_factor=2.0, group_radius_factor=1.0, group_halo_factor=1.2, maxiter=20, fit_positions=True, fit_radius=0.0, damp_snthresh=0.0, mask=None, bg=None, err=None, gain=None, bg_size=64, sn=None, centroid_iter=0, centroid_psf=None, keep_negative=True, get_bg=False, clip_sigma=3.0, clip_iters=5, verbose=False, ): """Photometry at the positions of already detected objects using SEP routines. Uses SEP's built-in features for optimal extraction with sigma-clipped background (via ``sum_circle_optimal``), PSF fitting photometry (via ``sep.psf_fit``), and iterative centroiding (via ``winpos``). Only available if SEP version 1.4+ with these features is installed. Parameters ---------- obj : astropy.table.Table Table with initial object detections to be measured. image : ndarray Input image as a NumPy array. aper : float Circular aperture radius in pixels for flux measurement. bkgann : tuple of float or None Background annulus (inner, outer radii) for local background. For optimal extraction, SEP handles this internally with sigma-clipping. For aperture photometry, ``sep.stats_circann()`` is used. fwhm : float, callable, or None If provided, ``aper`` and ``bkgann`` are measured in units of FWHM. Also used for Gaussian PSF in optimal extraction, PSF fitting, and centroiding. A callable (e.g. a :class:`stdpipe.photometry.FWHMMap`) is accepted and evaluated at each source position: per-source aperture/bkgann arrays feed the SEP photometry routines and the per-source width feeds ``sep.sum_circle_optimal``. The scalar summary ``float(fwhm)`` is used for the SEP windowed centroider (which requires a scalar sigma). psf : dict, sep.PSF, or None PSF model for PSF fitting photometry. When provided, PSF fitting is used instead of aperture or optimal extraction. Can be a PSFEx dict from ``stdpipe.psf.run_psfex()`` or a ``sep.PSF`` object. optimal : bool If True, use optimal extraction via ``sep.sum_circle_optimal()``. Requires ``fwhm``. Ignored when ``psf`` is provided. group_sources : bool If True, use grouped fitting for optimal or PSF photometry. group_factor : float Local fitting halo factor for grouped PSF fitting. Only used for PSF fitting when ``group_sources=True``; SEP determines PSF-fit connectivity from direct fit-support overlap. group_radius_factor : float Connectivity scale for grouped optimal extraction. Only used for optimal extraction when ``group_sources=True``. The default 1.0 groups sources whose apertures overlap. group_halo_factor : float or None Local context halo scale for grouped optimal extraction. The default 1.2 keeps a modest expanded solve context without widening source connectivity. ``None`` lets SEP use ``group_radius_factor``. maxiter : int Maximum number of PSF fitting iterations. fit_positions : bool If True, fit source positions during PSF fitting. fit_radius : float If > 0, only pixels within this radius (in pixels) of the source center participate in PSF fitting. 0 means use the full PSF stamp. Values of 2-3x FWHM reduce scatter in crowded fields. damp_snthresh : float S/N threshold for damping PSF fit updates. mask : ndarray of bool or None Image mask (True values are masked). bg : ndarray or None If provided, use this background instead of automatically computed one. err : ndarray or None Image noise map. gain : float or None Image gain in e-/ADU, used to build image noise model. bg_size : int Background grid size in pixels. sn : float or None Minimal S/N ratio for filtering. centroid_iter : int Number of centroiding iterations (uses SEP's built-in iteration). centroid_psf : dict, sep.PSF, or None PSF model for centroiding. If None, uses ``psf`` if available, otherwise Gaussian windowed centroiding. keep_negative : bool If False, measurements with negative fluxes will be discarded. get_bg : bool If True, also return estimated background and noise images. clip_sigma : float Sigma value for clipping when ``bkgann`` is provided. clip_iters : int Maximum number of clipping iterations when ``bkgann`` is provided. Set to 0 to disable clipping. verbose : bool or callable Whether to show verbose messages. May be boolean or print-like function. Returns ------- obj : astropy.table.Table Copy of table with ``flux``, ``fluxerr``, ``mag`` and ``magerr`` columns from SEP measurements. When ``psf`` is provided, also includes ``x_psf``, ``y_psf`` (fitted positions), ``chi2_psf``, ``niter_psf``, and ``flags_psf`` columns. bg_image : ndarray Background image (only returned if ``get_bg=True``). err_image : ndarray Error image (only returned if ``get_bg=True``). Notes ----- Quality flags set in the ``flags`` column: - 0x200: At least one aperture pixel is masked (aperture/optimal paths) - 0x400: Invalid position - 0x800: Optimal extraction failed (NaN result) - 0x1000: PSF fit returned non-zero quality flag, or PSF fit failed (NaN result) - 0x2000: Large centroid shift during PSF fit (>1 pixel) """ import sep if not _HAS_SEP_OPTIMAL: raise RuntimeError( "measure_objects_sep() requires SEP version 1.4+ with sum_circle_optimal() and " "stats_circann() functions. Please upgrade SEP or use measure_objects() instead." ) # Simple wrapper around print for logging in verbose mode only log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None if not len(obj): log('No objects to measure') return obj # Operate on the copy of the list obj = obj.copy() image1, mask0, mask = _prepare_image_and_mask(image, mask) # SEP requires C-contiguous arrays image1 = np.ascontiguousarray(image1) if bg is None or err is None or get_bg: log('Estimating global background with SEP') bg_est = sep.Background(image1, mask=mask | mask0, bw=bg_size, bh=bg_size) bg_est_bg = bg_est.back() bg_est_rms = bg_est.rms() else: bg_est = None if bg is None: log( 'Subtracting global background: median %.1f rms %.2f' % (np.median(bg_est_bg), np.std(bg_est_bg)) ) image1 -= bg_est_bg else: log( 'Subtracting user-provided background: median %.1f rms %.2f' % (np.median(bg), np.std(bg)) ) image1 -= bg image1[mask0] = 0 if err is None: log( 'Using global background noise map: median %.1f rms %.2f' % ( np.median(bg_est_rms), np.std(bg_est_rms), ) ) err = bg_est_rms else: log('Using user-provided noise map: median %.1f rms %.2f' % (np.median(err), np.std(err))) # Ensure error map is C-contiguous err = np.ascontiguousarray(err) fwhm_s = _fwhm_scalar(fwhm) fwhm_is_callable = _is_callable_fwhm(fwhm) if fwhm_s is not None: if fwhm_is_callable: log('Scaling aperture radii with spatial FWHM (median %.1f pix)' % fwhm_s) else: log('Scaling aperture radii with FWHM %.1f pix' % fwhm_s) # Scalar summary is used only for log output and as a fallback; the # actual per-source aperture radii are built below from fwhm_at_pos. aper_pix = aper * fwhm_s else: aper_pix = aper log('Using aperture radius %.1f pixels' % aper_pix) # Centroiding with SEP windowed positions (built-in iteration) if centroid_iter: # sep.winpos takes scalar sig/maxstep — use the scalar summary. maxstep = 0.2 * fwhm_s if fwhm_s else 0.6 # Build sep.PSF for PSF-weighted centroiding if requested sep_centroid_psf = None if centroid_psf is not None: sep_centroid_psf = _get_sep_psf(centroid_psf, fwhm, log) log('Using SEP PSF-weighted centroiding (maxstep=%.2f pix)' % maxstep) elif psf is not None: # Use the PSF model already provided for fitting sep_centroid_psf = _get_sep_psf(psf, fwhm, log) log('Using SEP PSF-weighted centroiding from psf model (maxstep=%.2f pix)' % maxstep) else: log('Using SEP Gaussian windowed centroiding (maxstep=%.2f pix)' % maxstep) # Keep original pixel positions obj['x_orig'] = np.array(obj['x']) obj['y_orig'] = np.array(obj['y']) x_vals, y_vals, valid_pos = _extract_valid_positions(obj) if np.any(valid_pos): winpos_kwargs = dict( mask=mask | mask0, maxstep=maxstep, ) if sep_centroid_psf is not None: winpos_kwargs['psf'] = sep_centroid_psf else: sigma = fwhm_s / 2.355 if fwhm_s else 1.0 winpos_kwargs['sig'] = sigma xwin, ywin, flag = sep.winpos( image1, x_vals[valid_pos], y_vals[valid_pos], **winpos_kwargs, ) # Update positions where centroiding succeeded and shift is reasonable total_shift = np.sqrt((xwin - x_vals[valid_pos]) ** 2 + (ywin - y_vals[valid_pos]) ** 2) max_total_shift = fwhm_s if fwhm_s else 3.0 # Reject shifts larger than 1 FWHM good_centroid = ( (flag == 0) & np.isfinite(xwin) & np.isfinite(ywin) & (total_shift < max_total_shift) ) update_indices = np.where(valid_pos)[0][good_centroid] obj['x'][update_indices] = xwin[good_centroid] obj['y'][update_indices] = ywin[good_centroid] if verbose: n_failed = np.sum(~good_centroid) n_converged = np.sum(good_centroid) median_shift = np.median(total_shift[good_centroid]) if np.any(good_centroid) else 0 log( f' Centroiding: {n_converged} converged, {n_failed} failed/rejected, ' f'median shift={median_shift:.3f} pix' ) if 'flags' not in obj.keys(): obj['flags'] = 0 x_vals, y_vals, valid_pos = _extract_valid_positions(obj) # Flag invalid positions obj['flags'][~valid_pos] |= 0x400 if psf is not None and optimal: raise ValueError("'psf' and 'optimal' are mutually exclusive") obj['flux'] = np.nan obj['fluxerr'] = np.nan if np.any(valid_pos): # Evaluate FWHM at the valid source positions. For a callable # ``fwhm``, this becomes a per-source array fed directly to SEP's # broadcasted APIs; scalar ``fwhm`` remains a single float. fwhm_at_pos = None if fwhm_is_callable: fwhm_at_pos = np.asarray(fwhm(x_vals[valid_pos], y_vals[valid_pos]), dtype=float) # Prepare per-source aperture radius. if fwhm_at_pos is not None: aper_arr = aper * fwhm_at_pos else: aper_arr = aper_pix # Prepare bkgann in pixels. bkgann_pix = None if bkgann is not None and len(bkgann) == 2: if fwhm_at_pos is not None: bkgann_pix = ( bkgann[0] * fwhm_at_pos, bkgann[1] * fwhm_at_pos, ) elif fwhm_s is not None: bkgann_pix = (bkgann[0] * fwhm_s, bkgann[1] * fwhm_s) else: bkgann_pix = bkgann if psf is not None: # PSF fitting photometry using sep.psf_fit() sep_psf = _get_sep_psf(psf, fwhm, log) log( 'Using SEP PSF fitting ' '(grouped=%s, group_factor=%.2g, fit_positions=%s, ' 'maxiter=%d, fit_radius=%.1f, damp_snthresh=%.2f)' % (group_sources, group_factor, fit_positions, maxiter, fit_radius, damp_snthresh) ) # Build keyword arguments for sep.psf_fit psf_kwargs = dict( err=err, gain=gain, mask=(mask | mask0).astype(np.uint8), grouped=group_sources, group_factor=group_factor, maxiter=maxiter, fit_positions=fit_positions, fit_radius=fit_radius, damp_snthresh=damp_snthresh, ) flux, fluxerr, xfit, yfit, flag, chi2, niter = sep.psf_fit( image1, x_vals[valid_pos], y_vals[valid_pos], sep_psf, **psf_kwargs, ) obj['flux'][valid_pos] = flux obj['fluxerr'][valid_pos] = fluxerr # Store fitted positions obj['x_psf'] = np.nan obj['y_psf'] = np.nan obj['x_psf'][valid_pos] = xfit obj['y_psf'][valid_pos] = yfit # Store PSF fit quality metrics obj['chi2_psf'] = np.nan obj['chi2_psf'][valid_pos] = chi2 obj['niter_psf'] = 0 obj['niter_psf'][valid_pos] = niter # Store PSF fit flags obj['flags_psf'] = 0 obj['flags_psf'][valid_pos] = flag # Flag sources where PSF fit returned non-zero flag obj['flags'][np.where(valid_pos)[0][flag > 0]] |= 0x1000 # Flag large centroid shifts (>1 pixel) if fit_positions: dx_fit = xfit - x_vals[valid_pos] dy_fit = yfit - y_vals[valid_pos] large_shift = (dx_fit**2 + dy_fit**2) > 1.0 obj['flags'][np.where(valid_pos)[0][large_shift]] |= 0x2000 elif optimal: # Optimal extraction using SEP with built-in background handling if fwhm_s is None: raise ValueError("'fwhm' must be provided for optimal extraction") if bkgann_pix is not None: if fwhm_at_pos is not None: log( 'Using SEP optimal extraction with sigma-clipped background annulus ' '(per-source, median %.1f-%.1f)' % (float(np.median(bkgann_pix[0])), float(np.median(bkgann_pix[1]))) ) else: log( 'Using SEP optimal extraction with sigma-clipped background annulus (%.1f, %.1f)' % tuple(bkgann_pix) ) else: log('Using SEP optimal extraction (no local background)') if group_sources: log( 'Grouped optimal extraction enabled ' '(radius_factor=%.2g, halo_factor=%s)' % ( group_radius_factor, ( 'SEP default' if group_halo_factor is None else '%.2g' % group_halo_factor ), ) ) # SEP accepts scalar or per-source arrays for aper and fwhm and # broadcasts them (fwhm array disables the grouped fast path but # still uses the generic grouped C solver). flux, fluxerr, flag = sep.sum_circle_optimal( image1, x_vals[valid_pos], y_vals[valid_pos], aper_arr, fwhm=fwhm_at_pos if fwhm_at_pos is not None else fwhm_s, err=err, gain=gain if gain else 1.0, mask=mask | mask0, bkgann=bkgann_pix, grouped=group_sources, group_radius_factor=group_radius_factor, group_halo_factor=group_halo_factor, clip_sigma=clip_sigma, clip_iters=clip_iters, ) obj['flux'][valid_pos] = flux obj['fluxerr'][valid_pos] = fluxerr obj['flags'][np.where(valid_pos)[0][flag > 0]] |= 0x200 else: # Standard aperture photometry log('Using standard aperture photometry') flux, fluxerr, flag = sep.sum_circle( image1, x_vals[valid_pos], y_vals[valid_pos], aper_arr, err=err, gain=gain if gain else 1.0, mask=mask | mask0, bkgann=None, # Handle separately for aperture photometry clip_sigma=clip_sigma, clip_iters=clip_iters, ) obj['flux'][valid_pos] = flux obj['fluxerr'][valid_pos] = fluxerr obj['flags'][np.where(valid_pos)[0][flag > 0]] |= 0x200 # For aperture photometry, compute local background separately if bkgann_pix is not None: log('Computing sigma-clipped local background for aperture photometry') bg_mean, bg_std, bg_median, bg_mad, bg_mean_clip, bg_flags = sep.stats_circann( image1, x_vals[valid_pos], y_vals[valid_pos], bkgann_pix[0], bkgann_pix[1], mask=mask | mask0, clip_sigma=clip_sigma, clip_iters=clip_iters, ) # Subtract local background (use median). aper_arr is either # a scalar or a per-source array, both broadcast correctly. aper_area = np.pi * np.asarray(aper_arr) ** 2 obj['flux'][valid_pos] -= bg_median * aper_area # Flag objects where measurement failed if psf is not None: obj['flags'][~np.isfinite(obj['flux'])] |= 0x1000 else: obj['flags'][~np.isfinite(obj['flux'])] |= 0x800 obj = _compute_magnitudes_and_filter(obj, sn, keep_negative, log) if get_bg: return obj, bg_est_bg, err else: return obj
def _aperture_overlap_at_offset(stamp, x_n, y_n, x_t, y_t, aper, subpixel=4): """Fraction of a placed PSF stamp that falls inside a circular aperture centred at an arbitrary position. The PSF stamp is treated as already representing the source at ``(x_n, y_n)`` (built via :func:`stdpipe.psf.get_psf_stamp` with the matching coordinates). The aperture has radius ``aper`` (image pixels) and centre ``(x_t, y_t)``. With ``subpixel >= 2`` the per-pixel aperture coverage is computed by a ``subpixel × subpixel`` sub-grid; ``subpixel=1`` falls back to a pixel-centre mask. """ h, w = stamp.shape x0_img = int(round(x_n)) - w // 2 y0_img = int(round(y_n)) - h // 2 cx = float(x_t) - x0_img cy = float(y_t) - y0_img if int(subpixel) <= 1: yy, xx = np.mgrid[0:h, 0:w] rr2 = (xx - cx) ** 2 + (yy - cy) ** 2 return float(np.sum(stamp[rr2 <= aper * aper])) n = int(subpixel) sub = (np.arange(n) + 0.5) / n - 0.5 dyy, dxx = np.meshgrid(sub, sub, indexing='ij') yy, xx = np.mgrid[0:h, 0:w] rr2 = ( (xx[:, :, None, None] - cx + dxx[None, None, :, :]) ** 2 + (yy[:, :, None, None] - cy + dyy[None, None, :, :]) ** 2 ) frac = np.mean(rr2 <= aper * aper, axis=(2, 3)) return float(np.sum(stamp * frac))
[docs] def measure_aperture_deblended( image, x, y, aper, psf, *, flux_seed, flux_psf, flux_psf_err=None, target=None, fwhm=None, err=None, mask=None, gain=None, bg=None, aperture_correction='ratio_field', correction_field_kwargs=None, ratio_clip=(0.5, 3.0), outlier_clip=5.0, min_for_field_fit=6, bad_flag=0x1000, propagate_neighbour_errors=True, overlap_subpixel=4, diagnostics=False, verbose=False, ): """Aperture photometry with PSF-based neighbour subtraction. For every position in ``(x[target], y[target])`` measure the aperture flux on ``image``, subtract the modelled contribution of every other detection inside that aperture (using each one's PSF-scaled total flux), and add back the target's own modelled contribution. The result is an aperture flux that handles crowding the way PSF photometry would, while remaining robust to PSF-model mismatch on the target itself: PSF-model error feeds only into the small neighbour-subtraction term, partially cancelling between ``model_flux`` and ``self_flux``. The "total flux" used to scale each detection's PSF stamp in the neighbour model is total_i = flux_seed_i * ratio(x_i, y_i) where ``ratio = flux_psf / flux_seed`` is fit as a smooth field across the frame from the per-source ratios. This provides a position-dependent aperture correction even when the PSF model has smoothly varying shape. Parameters ---------- image : 2-D ndarray Background-subtracted image (or pass the background separately via ``bg``). x, y : (N,) array-like Pixel positions of all detections in the region (targets + neighbours used only to model contamination). aper : float Aperture radius. In image pixels by default; interpreted as a multiple of ``fwhm`` when ``fwhm`` is given (matching the convention of :func:`measure_objects`). psf : dict PSF model as returned by :func:`stdpipe.psf.run_psfex`, :func:`stdpipe.psf.load_psf`, or :func:`stdpipe.psf.create_psf_model`. May be position-dependent. flux_seed : (N,) array-like Per-source aperture flux from the upstream detection pass. Used to anchor the smooth aperture-correction ratio field. flux_psf : (N,) array-like Per-source PSF-fitted flux from the upstream PSF-fitting pass. Used to anchor the ratio field and to scale neighbour PSF stamps in the model image. flux_psf_err : (N,) array-like, optional Per-source PSF-fit flux errors. Required when ``propagate_neighbour_errors=True``. target : (N,) bool mask or array of int indices, optional Which detections to measure. Detections not in ``target`` still appear in the neighbour model. Default: every detection. fwhm : float, optional Source FWHM in pixels. When given, ``aper`` is interpreted as a multiple of FWHM. err : 2-D ndarray, optional Per-pixel 1-sigma error map for the input image. Used by ``sep.sum_circle`` to compute the aperture noise. mask : 2-D bool ndarray, optional Bad-pixel mask. ``True`` pixels are excluded from aperture sums. gain : float, optional Detector gain in e-/ADU; forwarded to ``sep.sum_circle`` for Poisson noise on the aperture sum. bg : 2-D ndarray, optional Background map. If given, it is subtracted from ``image`` before the aperture sums (so ``image`` itself need not be background- subtracted). aperture_correction : {'ratio_field', 'fixed', 'none'} How to map ``flux_seed`` to "total flux" when scaling neighbour PSF stamps. ``'ratio_field'`` fits a smooth 2-D field of the per-source PSF/seed ratio (default; see ``correction_field_kwargs`` and ``min_for_field_fit``); ``'fixed'`` uses a single MAD-clipped median ratio; ``'none'`` uses ``flux_seed`` as the total flux. correction_field_kwargs : dict, optional Forwarded to :func:`stdpipe.smoothing.fit_vector_field_2d`. The default uses the LOESS backend with ``k=100`` and per-axis scales chosen from the spread of the input positions, which is a quality upgrade over the rigid 2-D quadratic used in earlier prototypes. ratio_clip : (lo, hi) Hard limits applied to the modelled ratio at every position to guard against extrapolation pathologies. outlier_clip : float MAD multiple used to reject outliers from the per-source ratio sample before fitting the field. min_for_field_fit : int Minimum number of clean ratios required to fit the smooth field; below this, the median ratio is used everywhere. bad_flag : int Flag bit OR'ed into the output ``flags`` for measurements that could not be computed. propagate_neighbour_errors : bool If True (default), inflate ``fluxerr`` by the quadrature sum of each near neighbour's PSF-flux error scaled by its overlap with the target aperture. Requires ``flux_psf_err``. overlap_subpixel : int Sub-pixel sampling factor for the per-neighbour aperture overlap integrals. ``1`` for a faster pixel-centre mask. diagnostics : bool If True, attach extra columns to the output table: ``flux_ap_raw``, ``flux_model``, ``flux_total_model``, ``flux_ratio_model``, ``self_frac``, ``flux_neighbour_err``. verbose : bool or callable Boolean to enable default ``print`` logging, or a print-like callable. Returns ------- astropy.table.Table One row per measured target, in the order they appeared in the input. Columns: ``x``, ``y``, ``flux``, ``fluxerr``, ``mag``, ``magerr``, ``flags``. Diagnostic columns when ``diagnostics=True``. """ from astropy.table import Table from . import psf as psf_mod from . import smoothing import sep log = (verbose if callable(verbose) else print) if verbose else ( lambda *a, **k: None ) x = np.asarray(x, dtype=float) y = np.asarray(y, dtype=float) flux_seed = np.asarray(flux_seed, dtype=float) flux_psf = np.asarray(flux_psf, dtype=float) if x.shape != y.shape or x.shape != flux_seed.shape \ or x.shape != flux_psf.shape: raise ValueError( "x, y, flux_seed, flux_psf must all have the same shape" ) n_all = x.size if propagate_neighbour_errors and flux_psf_err is None: raise ValueError( "flux_psf_err is required when propagate_neighbour_errors=True" ) if flux_psf_err is not None: flux_psf_err = np.asarray(flux_psf_err, dtype=float) if flux_psf_err.shape != x.shape: raise ValueError("flux_psf_err must have the same shape as x") if target is None: target_idx = np.arange(n_all, dtype=int) else: target = np.asarray(target) if target.dtype == bool: if target.shape != x.shape: raise ValueError("target mask must match the shape of x") target_idx = np.where(target)[0] else: target_idx = target.astype(int) aper_pix = float(aper) * float(fwhm) if fwhm is not None else float(aper) # ------------------------------------------------------------------ # 1. Per-source ratio = flux_psf / flux_seed, MAD-clipped sample. # ------------------------------------------------------------------ with np.errstate(divide='ignore', invalid='ignore'): ratio = flux_psf / flux_seed ratio_ok = ( np.isfinite(ratio) & (ratio > 0) & np.isfinite(flux_seed) & (flux_seed > 0) & np.isfinite(flux_psf) & (flux_psf > 0) ) n_clean_ratio = int(ratio_ok.sum()) if n_clean_ratio: ratio_med = float(np.nanmedian(ratio[ratio_ok])) ratio_mad = float(np.nan if n_clean_ratio < 2 else _astropy_mad_std(ratio[ratio_ok])) if np.isfinite(ratio_mad) and ratio_mad > 0: ratio_ok &= np.abs(ratio - ratio_med) < outlier_clip * ratio_mad else: ratio_med = float('nan') # ------------------------------------------------------------------ # 2. Smooth ratio field (or fall back to fixed median). # ------------------------------------------------------------------ use_field = ( aperture_correction == 'ratio_field' and int(ratio_ok.sum()) >= int(min_for_field_fit) ) if use_field: kwargs = dict(correction_field_kwargs or {}) kwargs.setdefault('backend', 'loess') if kwargs['backend'] == 'loess': xs = float(np.ptp(x[ratio_ok])) or 1.0 ys = float(np.ptp(y[ratio_ok])) or 1.0 kwargs.setdefault('scales', (xs / 4.0, ys / 4.0)) kwargs.setdefault('k', min(100, int(ratio_ok.sum()))) ratio_pred_fn = smoothing.fit_vector_field_2d( x[ratio_ok], y[ratio_ok], ratio[ratio_ok], **kwargs ) log("aperture-correction field fit on %d sources" % int(ratio_ok.sum())) elif aperture_correction == 'none': def ratio_pred_fn(xq, yq): return np.ones_like(np.asarray(xq, float)) else: if aperture_correction == 'ratio_field': log("ratio-field fallback: only %d clean ratios, using fixed median" % int(ratio_ok.sum())) const = ratio_med if np.isfinite(ratio_med) else 1.0 def ratio_pred_fn(xq, yq): return np.full(np.asarray(xq, float).shape, const) model_ratio_all = np.asarray(ratio_pred_fn(x, y), float) bad_ratio = ~np.isfinite(model_ratio_all) if bad_ratio.any(): fill = ratio_med if np.isfinite(ratio_med) else 1.0 model_ratio_all[bad_ratio] = fill lo, hi = ratio_clip np.clip(model_ratio_all, lo, hi, out=model_ratio_all) total_flux = flux_seed * model_ratio_all # ------------------------------------------------------------------ # 3. Build the neighbour model image. # ------------------------------------------------------------------ image_in = np.ascontiguousarray(image, dtype=np.double) if bg is not None: image_in = image_in - np.asarray(bg, dtype=np.double) model_image = np.zeros_like(image_in) place_ok = ( np.isfinite(total_flux) & (total_flux > 0) & np.isfinite(x) & np.isfinite(y) ) for i in np.where(place_ok)[0]: psf_mod.place_psf_stamp( model_image, psf, float(x[i]), float(y[i]), flux=float(total_flux[i]), ) # ------------------------------------------------------------------ # 4. Aperture sums on real and model images at each target. # ------------------------------------------------------------------ x_t = x[target_idx] y_t = y[target_idx] err_in = np.ascontiguousarray(err, dtype=np.double) if err is not None else None mask_in = np.ascontiguousarray(mask, dtype=bool) if mask is not None else None raw_flux, raw_fluxerr, raw_flag = sep.sum_circle( image_in, x_t, y_t, aper_pix, err=err_in, gain=gain, mask=mask_in, ) model_flux, _, _ = sep.sum_circle(model_image, x_t, y_t, aper_pix) # ------------------------------------------------------------------ # 5. Self flux = total_flux_target * enclosed_psf_fraction(target). # ------------------------------------------------------------------ self_frac = np.full(target_idx.size, np.nan) for k, i in enumerate(target_idx): if not place_ok[i]: continue try: self_frac[k] = psf_mod.enclosed_psf_fraction( psf, float(x[i]), float(y[i]), radius=aper_pix, subpixel=overlap_subpixel, ) except Exception: pass self_flux = total_flux[target_idx] * self_frac corr_flux = raw_flux - model_flux + self_flux # ------------------------------------------------------------------ # 6. Optional per-target neighbour error in quadrature. # ------------------------------------------------------------------ neighbour_err = np.zeros(target_idx.size, dtype=float) if propagate_neighbour_errors and flux_psf_err is not None: # PSF radius in image pixels (rough): half the stamp size. psf_radius_pix = 0.5 * float(psf['width'] * psf['sampling']) search_r = psf_radius_pix + aper_pix from scipy.spatial import cKDTree tree = cKDTree(np.c_[x, y]) near_lists = tree.query_ball_point( np.c_[x_t, y_t], r=search_r, ) # Per-source PSF-fit flux error scaled to the same total-flux # units we placed the stamps in. with np.errstate(divide='ignore', invalid='ignore'): ratio_for_err = np.where( flux_psf > 0, model_ratio_all * flux_seed / flux_psf, model_ratio_all, ) ratio_for_err = np.where( np.isfinite(ratio_for_err), ratio_for_err, 1.0 ) total_flux_err = np.abs(flux_psf_err) * np.abs(ratio_for_err) for k, i in enumerate(target_idx): var = 0.0 for j in near_lists[k]: if j == i: continue if not place_ok[j] or not np.isfinite(total_flux_err[j]): continue stamp = psf_mod.get_psf_stamp( psf, float(x[j]), float(y[j]), normalize=True, ) overlap = _aperture_overlap_at_offset( stamp, float(x[j]), float(y[j]), float(x_t[k]), float(y_t[k]), aper_pix, subpixel=overlap_subpixel, ) var += (total_flux_err[j] * overlap) ** 2 neighbour_err[k] = np.sqrt(var) fluxerr = np.sqrt(np.asarray(raw_fluxerr, float) ** 2 + neighbour_err ** 2) else: fluxerr = np.asarray(raw_fluxerr, float) # ------------------------------------------------------------------ # 7. Output table + flags + magnitudes. # ------------------------------------------------------------------ out = Table() out['x'] = x_t out['y'] = y_t out['flux'] = corr_flux out['fluxerr'] = fluxerr out['flags'] = np.asarray(raw_flag, int).copy() bad = ( ~np.isfinite(corr_flux) | ~np.isfinite(fluxerr) | (corr_flux <= 0) | (np.asarray(raw_flag, int) > 0) ) out['flags'][bad] |= int(bad_flag) out['mag'] = np.full(target_idx.size, np.nan) out['magerr'] = np.full(target_idx.size, np.nan) good = ~bad out['mag'][good] = -2.5 * np.log10(corr_flux[good]) out['magerr'][good] = (2.5 / np.log(10)) * fluxerr[good] / corr_flux[good] if diagnostics: out['flux_ap_raw'] = np.asarray(raw_flux, float) out['flux_model'] = np.asarray(model_flux, float) out['flux_total_model'] = total_flux[target_idx] out['flux_ratio_model'] = model_ratio_all[target_idx] out['self_frac'] = self_frac if propagate_neighbour_errors: out['flux_neighbour_err'] = neighbour_err return out
def _astropy_mad_std(x): from astropy.stats import mad_std return float(mad_std(x))