"""
Routines for object detection and photometry.
"""
import os, shutil, tempfile, shlex
import numpy as np
from astropy.wcs import WCS
from astropy.io import fits
from astropy.stats import mad_std, sigma_clipped_stats
from astropy.table import Table
import warnings
from astropy.wcs import FITSFixedWarning
# warnings.simplefilter(action='ignore', category=FITSFixedWarning)
# warnings.simplefilter(action='ignore', category=FutureWarning)
import sep
import photutils
import photutils.background
import photutils.aperture
import photutils.segmentation
import photutils.detection
# Put these to common namespace
from .photometry_model import match, make_sn_model, get_detection_limit_sn, format_color_term
from .photometry_measure import measure_objects
# Conditionally import SEP-based photometry if available
try:
from .photometry_measure import measure_objects_sep, _HAS_SEP_OPTIMAL
except ImportError:
_HAS_SEP_OPTIMAL = False
from .photometry_background import (
get_background,
get_background_percentile,
get_background_morphology,
estimate_background_rms_percentile,
estimate_background_rms_local,
)
try:
import cv2
# Much faster dilation
dilate = lambda image, mask: cv2.dilate(image.astype(np.uint8), mask).astype(bool)
except:
from scipy.signal import fftconvolve
dilate = lambda image, mask: fftconvolve(image, mask, mode='same') > 0.9
from . import utils
[docs]
def make_kernel(r0=1.0, ext=1.0):
x, y = np.mgrid[
np.floor(-ext * r0) : np.ceil(ext * r0 + 1),
np.floor(-ext * r0) : np.ceil(ext * r0 + 1),
]
r = np.hypot(x, y)
image = np.exp(-(r**2) / 2 / r0**2)
return image
[docs]
def estimate_fwhm(values, *, good=None, fwhm_range=(1.0, 20.0), min_candidates=5):
"""Robust global FWHM estimate from per-object FWHM measurements.
Uses a sliding-window mode estimator (narrowest quartile interval) that is
resistant to outliers from galaxies, blends, and non-Gaussian tails.
Parameters
----------
values : array-like
1-D array of per-object FWHM values in pixels.
good : array-like of bool, optional
Boolean mask, True = use this object. The caller is responsible for
applying any S/N, flag, or ellipticity cuts before calling, or
encoding them in this mask.
fwhm_range : tuple of float
Hard clip range ``(lo, hi)`` in pixels. Values outside this range
are excluded before the mode calculation.
min_candidates : int
Minimum number of surviving candidates required.
Returns
-------
float
Scalar FWHM estimate in pixels, or ``np.nan`` on failure.
"""
values = np.asarray(values, dtype=float).ravel()
mask = np.isfinite(values)
if good is not None:
mask &= np.asarray(good, dtype=bool).ravel()
mask &= (values >= fwhm_range[0]) & (values < fwhm_range[1])
f = np.sort(values[mask])
if len(f) < min_candidates:
return np.nan
if len(f) == 1:
return float(f[0])
# Sliding-window mode: find the narrowest interval spanning n//4 objects
nw = max(len(f) // 4, 1)
widths = f[nw:] - f[:-nw]
i = int(np.argmin(widths))
return float(0.5 * (f[i] + f[i + nw]))
def _weighted_median(values, weights):
"""Weighted median of *values* using *weights*.
Both arrays must already be filtered to finite, positive entries.
Returns ``np.nan`` if fewer than 2 values survive.
"""
if len(values) < 2 or not np.any(weights > 0):
return np.nan
idx = np.argsort(values)
cumw = np.cumsum(weights[idx])
cumw /= cumw[-1]
return float(values[idx[np.searchsorted(cumw, 0.5)]])
[docs]
class FWHMMap:
"""Position-dependent FWHM estimator backed by a 2-D polynomial.
Produced by :func:`estimate_fwhm_from_objects` when ``spatial_order >= 1``.
Coordinates are normalised internally to roughly ``[-1, 1]`` for numerical
stability.
Instances are callable: ``fwhm_map(x, y)`` returns FWHM in pixels at the
requested pixel coordinates (``x``, ``y`` may be scalars or broadcastable
arrays; the result has the broadcast shape). Instances are also
convertible via ``float(fwhm_map)`` to the scalar median summary, so they
can be dropped into APIs that still expect a single number.
Attributes
----------
coeffs : ndarray
Flat array of polynomial coefficients, ordered by
``(i, j)`` with ``i + j <= order`` and ``j`` varied fastest.
order : int
Polynomial order.
x0, y0, sx, sy : float
Coordinate normalisation: ``xn = (x - x0) / sx``.
median : float
Representative scalar FWHM (median of the fitted surface over the
good candidates). Used by ``float(self)``.
n_used : int
Number of candidates retained after sigma-clipping.
fwhm_range : tuple of float
Output clipping range in pixels.
"""
__slots__ = (
'coeffs',
'order',
'x0',
'y0',
'sx',
'sy',
'median',
'n_used',
'fwhm_range',
)
def __init__(self, coeffs, order, x0, y0, sx, sy, median, n_used, fwhm_range):
self.coeffs = np.asarray(coeffs, dtype=float).ravel()
self.order = int(order)
self.x0 = float(x0)
self.y0 = float(y0)
self.sx = float(sx)
self.sy = float(sy)
self.median = float(median)
self.n_used = int(n_used)
self.fwhm_range = (float(fwhm_range[0]), float(fwhm_range[1]))
def _terms(self):
for i in range(self.order + 1):
for j in range(self.order + 1 - i):
yield i, j
[docs]
def __call__(self, x, y):
x = np.asarray(x, dtype=float)
y = np.asarray(y, dtype=float)
xn = (x - self.x0) / self.sx
yn = (y - self.y0) / self.sy
shape = np.broadcast_shapes(xn.shape, yn.shape)
xn = np.broadcast_to(xn, shape)
yn = np.broadcast_to(yn, shape)
out = np.zeros(shape, dtype=float)
for k, (i, j) in enumerate(self._terms()):
out = out + self.coeffs[k] * (xn**i) * (yn**j)
return np.clip(out, self.fwhm_range[0], self.fwhm_range[1])
def __float__(self):
return self.median
def __repr__(self):
return "FWHMMap(order=%d, median=%.3f, n_used=%d)" % (self.order, self.median, self.n_used)
def _fit_spatial_fwhm(
values,
x,
y,
good,
weights=None,
*,
order,
fwhm_range,
image_shape=None,
clip_sigma=3.0,
clip_iters=2,
log=None,
):
"""Fit a 2-D polynomial to per-object FWHM with sigma-clipping.
Returns a :class:`FWHMMap` on success, or ``None`` when fewer than
``3 * nterms`` candidates survive (so callers can fall back to the scalar
path).
"""
_log = log or (lambda *a, **k: None)
values = np.asarray(values, dtype=float)
x = np.asarray(x, dtype=float)
y = np.asarray(y, dtype=float)
good = np.asarray(good, dtype=bool).copy()
good &= np.isfinite(values) & np.isfinite(x) & np.isfinite(y)
good &= (values >= fwhm_range[0]) & (values < fwhm_range[1])
nterms = (order + 1) * (order + 2) // 2
if good.sum() < 3 * nterms:
_log(
"Spatial FWHM fit: only %d candidates for %d terms — "
"falling back to scalar" % (int(good.sum()), nterms)
)
return None
# Coordinate normalisation
if image_shape is not None:
H, W = image_shape[:2]
x0 = 0.5 * float(W)
y0 = 0.5 * float(H)
sx = max(0.5 * float(W), 1.0)
sy = max(0.5 * float(H), 1.0)
else:
xg, yg = x[good], y[good]
x0 = 0.5 * (xg.min() + xg.max())
y0 = 0.5 * (yg.min() + yg.max())
sx = max(0.5 * (xg.max() - xg.min()), 1.0)
sy = max(0.5 * (yg.max() - yg.min()), 1.0)
xn = (x - x0) / sx
yn = (y - y0) / sy
# Design matrix with i+j<=order, j varying fastest (matches FWHMMap._terms)
cols = []
for i in range(order + 1):
for j in range(order + 1 - i):
cols.append((xn**i) * (yn**j))
D = np.column_stack(cols)
if weights is not None:
w_all = np.asarray(weights, dtype=float).copy()
w_all = np.where(np.isfinite(w_all) & (w_all > 0), w_all, 0.0)
else:
w_all = np.ones_like(values)
mask = good.copy()
coeffs = None
# Some numpy/BLAS builds emit spurious fp warnings from matmul even on
# well-formed inputs; suppress them for the fit's own arithmetic.
with np.errstate(divide='ignore', over='ignore', invalid='ignore'):
for it in range(max(clip_iters, 1)):
m = mask & (w_all > 0)
if m.sum() < nterms:
break
sw = np.sqrt(w_all[m])
A = D[m] * sw[:, None]
b = values[m] * sw
coeffs, *_ = np.linalg.lstsq(A, b, rcond=None)
# Residuals on candidate rows only — D rows outside `good` may
# normalise to large values and overflow at order >= 2.
resid_m = values[m] - D[m] @ coeffs
if it == clip_iters - 1:
break
sig = mad_std(resid_m, ignore_nan=True)
if not np.isfinite(sig) or sig == 0:
break
keep = np.abs(resid_m) < clip_sigma * sig
idx_m = np.flatnonzero(m)
new_mask = np.zeros_like(mask)
new_mask[idx_m[keep]] = True
mask = new_mask
if coeffs is None:
return None
final_m = mask & (w_all > 0)
n_used = int(final_m.sum())
if n_used == 0:
return None
fit_vals_m = np.clip(D[final_m] @ coeffs, fwhm_range[0], fwhm_range[1])
median = float(np.nanmedian(fit_vals_m))
if not np.isfinite(median):
return None
return FWHMMap(
coeffs=coeffs,
order=order,
x0=x0,
y0=y0,
sx=sx,
sy=sy,
median=median,
n_used=n_used,
fwhm_range=fwhm_range,
)
[docs]
def estimate_fwhm_from_objects(
obj,
*,
snr_min=10.0,
max_ellipticity=0.3,
use_flags=True,
fwhm_range=(1.0, 20.0),
min_candidates=5,
spatial_order=0,
xy_cols=('x', 'y'),
image_shape=None,
verbose=False,
):
"""Estimate global FWHM from an object table (SEP or SExtractor).
Extracts per-object FWHM values and builds a quality mask from the
available catalog columns, then delegates to :func:`estimate_fwhm`.
When a ``flux_radius`` column is present (half-light radius, as
produced by :func:`get_objects_sep`), it is converted to an
equivalent Gaussian FWHM via ``FWHM = 2 * flux_radius`` and used
as the primary estimator. Half-light radii are 2--3× more stable
than SEP's Gaussian-core FWHM (relative IQR ~0.2 vs ~0.7) and far
less susceptible to bias from sub-stellar contaminants, because the
measurement integrates flux over many pixels rather than fitting a
slope to a handful of core pixels.
When ``flux_radius`` is not available, the function falls back to
the ``fwhm`` / ``FWHM_IMAGE`` column or moment-derived FWHM,
and applies an ``a*b``-weighted median correction if the mode
appears biased (weighted median exceeds mode by > 10 %).
Parameters
----------
obj : `~astropy.table.Table` or structured ndarray
Detection results. Recognized columns (checked in priority order):
``flux_radius`` (half-light radius), ``fwhm``, ``FWHM_IMAGE``,
or derived from ``a``/``b`` (``A_IMAGE``/``B_IMAGE``) moments.
snr_min : float or None
Minimum S/N for candidate selection (via ``magerr``).
Set to None to disable.
max_ellipticity : float or None
Maximum ellipticity ``1 - b/a``. Set to None to disable.
use_flags : bool
If True, reject objects with nonzero flags.
fwhm_range : tuple of float
Passed through to :func:`estimate_fwhm`.
min_candidates : int
Passed through to :func:`estimate_fwhm`.
spatial_order : int
Polynomial order for a position-dependent FWHM model. ``0`` (default)
preserves the historical behaviour and returns a scalar. ``>=1``
attempts to fit a 2-D polynomial ``FWHM(x, y)`` to the per-object
values and returns a :class:`FWHMMap` callable. If too few candidates
survive the quality cuts for the requested order, the function falls
back to the scalar path.
xy_cols : tuple of str
Column names used for pixel coordinates when ``spatial_order >= 1``.
Defaults to ``('x', 'y')`` (SEP-style). Use ``('X_IMAGE', 'Y_IMAGE')``
for SExtractor output. Falls back to the scalar path if either column
is missing.
image_shape : tuple of int, optional
``(H, W)`` image shape used to normalise pixel coordinates to roughly
``[-1, 1]`` when ``spatial_order >= 1``. If not provided, the min/max
of the candidate coordinates are used.
verbose : bool or callable
If True, log diagnostic information about the estimation.
Returns
-------
float or FWHMMap
Scalar FWHM in pixels when ``spatial_order == 0``. A :class:`FWHMMap`
callable otherwise, or a scalar on fallback. Returns ``np.nan`` if no
usable candidates survive.
"""
log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None
names = obj.dtype.names if hasattr(obj, 'dtype') else obj.colnames
# --- Extract shape columns for ab-weighting (if available) ---
ab = None
if 'a' in names and 'b' in names:
a = np.asarray(obj['a'], dtype=float)
b = np.asarray(obj['b'], dtype=float)
ab = a * b
elif 'A_IMAGE' in names and 'B_IMAGE' in names:
a = np.asarray(obj['A_IMAGE'], dtype=float)
b = np.asarray(obj['B_IMAGE'], dtype=float)
ab = a * b
# --- Build quality mask ---
good = np.ones(len(obj), dtype=bool)
if snr_min is not None:
if 'magerr' in names:
good &= np.isfinite(obj['magerr']) & (obj['magerr'] < 1.0 / snr_min)
good &= np.asarray(obj['magerr'], dtype=float) > 0
elif 'MAGERR_APER' in names:
magerr = np.asarray(obj['MAGERR_APER'], dtype=float)
if magerr.ndim > 1:
magerr = magerr[:, 0]
good &= np.isfinite(magerr) & (magerr < 1.0 / snr_min) & (magerr > 0)
if max_ellipticity is not None:
if ab is not None:
with np.errstate(invalid='ignore'):
good &= (a > 0) & ((1.0 - b / a) < max_ellipticity)
elif 'A_IMAGE' in names and 'B_IMAGE' in names:
a_ = np.asarray(obj['A_IMAGE'], dtype=float)
b_ = np.asarray(obj['B_IMAGE'], dtype=float)
with np.errstate(invalid='ignore'):
good &= (a_ > 0) & ((1.0 - b_ / a_) < max_ellipticity)
if use_flags:
if 'flags' in names:
good &= np.asarray(obj['flags']) == 0
elif 'flag' in names:
good &= np.asarray(obj['flag']) == 0
elif 'FLAGS' in names:
good &= np.asarray(obj['FLAGS']) == 0
# --- Spatial (polynomial) path ---
# When requested, attempt a 2-D polynomial fit of FWHM(x, y) using the
# same priority order for per-object FWHM proxies (flux_radius → fwhm →
# moments). Falls back to the scalar path if coordinates are missing or
# too few candidates survive.
if spatial_order >= 1:
xcol, ycol = xy_cols
if xcol in names and ycol in names:
xcoord = np.asarray(obj[xcol], dtype=float)
ycoord = np.asarray(obj[ycol], dtype=float)
fr_col_s = (
'flux_radius'
if 'flux_radius' in names
else 'FLUX_RADIUS'
if 'FLUX_RADIUS' in names
else None
)
if fr_col_s is not None:
sp_values = 2.0 * np.asarray(obj[fr_col_s], dtype=float)
sp_source = 'flux_radius'
elif 'fwhm' in names:
sp_values = np.asarray(obj['fwhm'], dtype=float)
sp_source = 'fwhm'
elif 'FWHM_IMAGE' in names:
sp_values = np.asarray(obj['FWHM_IMAGE'], dtype=float)
sp_source = 'FWHM_IMAGE'
elif ab is not None:
sp_values = 2.0 * np.sqrt(np.log(2) * (a**2 + b**2))
sp_source = 'moments'
elif 'A_IMAGE' in names and 'B_IMAGE' in names:
a_s = np.asarray(obj['A_IMAGE'], dtype=float)
b_s = np.asarray(obj['B_IMAGE'], dtype=float)
sp_values = 2.0 * np.sqrt(np.log(2) * (a_s**2 + b_s**2))
sp_source = 'moments'
else:
sp_values = None
if sp_values is not None:
fmap = _fit_spatial_fwhm(
sp_values,
xcoord,
ycoord,
good,
weights=ab,
order=spatial_order,
fwhm_range=fwhm_range,
image_shape=image_shape,
log=log,
)
if fmap is not None:
log(
"Spatial FWHM (%s, order=%d): median=%.2f, "
"n_used=%d" % (sp_source, spatial_order, fmap.median, fmap.n_used)
)
return fmap
else:
log(
"Spatial FWHM requested but coord columns %r/%r missing — "
"falling back to scalar" % (xcol, ycol)
)
# --- Primary path: flux_radius (half-light radius) ---
# Gaussian: FWHM = 2 * R_half. This slightly underestimates for
# Moffat profiles but the mode estimator naturally centres on the
# stellar peak, compensating for the conversion factor difference.
fr_col = (
'flux_radius'
if 'flux_radius' in names
else 'FLUX_RADIUS'
if 'FLUX_RADIUS' in names
else None
)
if fr_col is not None:
fr = np.asarray(obj[fr_col], dtype=float)
fwhm_fr = 2.0 * fr
fwhm_fr_mode = estimate_fwhm(
fwhm_fr,
good=good,
fwhm_range=fwhm_range,
min_candidates=min_candidates,
)
if np.isfinite(fwhm_fr_mode):
# Cross-check against ab-weighted median
if ab is not None:
valid = (
good
& np.isfinite(fwhm_fr)
& (fwhm_fr >= fwhm_range[0])
& (fwhm_fr < fwhm_range[1])
)
fwhm_fr_weighted = _weighted_median(fwhm_fr[valid], ab[valid])
else:
fwhm_fr_weighted = np.nan
if np.isfinite(fwhm_fr_weighted):
ratio = fwhm_fr_weighted / fwhm_fr_mode
if ratio > 1.1:
log(
"FWHM (flux_radius): mode=%.2f, ab-weighted median=%.2f "
"(ratio %.2f) — using weighted median"
% (fwhm_fr_mode, fwhm_fr_weighted, ratio)
)
return fwhm_fr_weighted
else:
log(
"FWHM (flux_radius): mode=%.2f, ab-weighted median=%.2f "
"(ratio %.2f) — consistent" % (fwhm_fr_mode, fwhm_fr_weighted, ratio)
)
else:
log("FWHM (flux_radius): mode=%.2f" % fwhm_fr_mode)
return fwhm_fr_mode
# --- Fallback: FWHM column or moment-derived FWHM ---
if 'fwhm' in names:
values = np.asarray(obj['fwhm'], dtype=float)
elif 'FWHM_IMAGE' in names:
values = np.asarray(obj['FWHM_IMAGE'], dtype=float)
elif 'a' in names and 'b' in names:
values = 2.0 * np.sqrt(
np.log(2)
* (np.asarray(obj['a'], dtype=float) ** 2 + np.asarray(obj['b'], dtype=float) ** 2)
)
elif 'A_IMAGE' in names and 'B_IMAGE' in names:
values = 2.0 * np.sqrt(
np.log(2)
* (
np.asarray(obj['A_IMAGE'], dtype=float) ** 2
+ np.asarray(obj['B_IMAGE'], dtype=float) ** 2
)
)
else:
return np.nan
fwhm_mode = estimate_fwhm(
values, good=good, fwhm_range=fwhm_range, min_candidates=min_candidates
)
# ab-weighted median as guard against sub-stellar contamination
if ab is not None:
valid = good & np.isfinite(values) & (values >= fwhm_range[0]) & (values < fwhm_range[1])
fwhm_weighted = _weighted_median(values[valid], ab[valid])
else:
fwhm_weighted = np.nan
if np.isfinite(fwhm_mode) and np.isfinite(fwhm_weighted):
ratio = fwhm_weighted / fwhm_mode
if ratio > 1.1:
log(
"FWHM: mode=%.2f, ab-weighted median=%.2f (ratio %.2f) "
"— mode likely biased by sub-stellar objects, using weighted median"
% (fwhm_mode, fwhm_weighted, ratio)
)
return fwhm_weighted
else:
log(
"FWHM: mode=%.2f, ab-weighted median=%.2f (ratio %.2f) — consistent"
% (fwhm_mode, fwhm_weighted, ratio)
)
return fwhm_mode
elif np.isfinite(fwhm_weighted):
return fwhm_weighted
else:
return fwhm_mode
[docs]
def get_objects_sep(
image,
header=None,
mask=None,
mask_detect=None,
err=None,
thresh=4.0,
aper=3.0,
bkgann=None,
r0=0.5,
gain=1,
edge=0,
minnthresh=2,
minarea=5,
relfluxradius=2.0,
wcs=None,
bg_size=64,
fwhm=False,
optimal=False,
group_sources=True,
group_radius_factor=1.0,
group_halo_factor=1.2,
centroid=False,
centroid_sigma_scale=1.0,
centroid_maxstep_scale=1.0,
centroid_maxshift_scale=0.5,
use_mask_large=False,
subtract_bg=True,
npix_large=100,
sn=10.0,
get_segmentation=False,
deblend_fwhm=0,
deblend_method='watershed',
clip_sigma=3.0,
clip_iters=5,
fwhm_spatial_order=0,
verbose=True,
**kwargs,
):
"""Object detection and aperture/optimal photometry using
`SEP <https://github.com/kbarbary/sep>`_.
Signature is as similar as possible to
:func:`~stdpipe.photometry.get_objects_sextractor`.
Algorithm: background estimation → object detection with optional
smoothing and deblending (watershed by default) → optional windowed
centroiding → edge rejection → per-object FWHM estimation → aperture
or optimal extraction photometry → S/N quality cut.
Detection flags are documented at
https://sep.readthedocs.io/en/v1.1.x/reference.html —
they differ from SExtractor flags.
Parameters
----------
image : numpy.ndarray
Input 2-D image.
header : astropy.io.fits.Header, optional
Image header.
mask : numpy.ndarray of bool, optional
Pixel mask (True = masked). Used for background estimation and
photometry. Objects whose detection footprint overlaps this mask
are flagged with 0x100 but not excluded from detection.
mask_detect : numpy.ndarray of bool, optional
Detection mask (True = exclude from detection). Combined (OR) with
non-finite pixel mask. Use this to exclude entire regions (e.g. bad
columns, satellite trails). If None, only non-finite pixels are
masked during detection.
err : numpy.ndarray, optional
Noise map.
thresh : float
Detection threshold in sigmas above local background.
aper : float
Aperture radius in pixels, or in FWHM units when *fwhm* is
provided/estimated.
bkgann : tuple of float, optional
Background annulus ``(r_in, r_out)`` in pixels (or FWHM units).
Uses arithmetic mean of unmasked pixels inside the annulus.
If None, the global background model is used.
r0 : float
Smoothing kernel sigma (pixels) for detection.
gain : float
Detector gain, e-/ADU.
edge : int
Reject objects closer to image edge than this many pixels.
minnthresh : int
Minimum number of pixels above threshold per detection.
minarea : int
Minimum number of pixels in an object footprint.
relfluxradius : float
Multiplier for rough FWHM when computing ``flux_radius`` search
radius (internal use).
wcs : astropy.wcs.WCS, optional
WCS for sky coordinate assignment.
bg_size : int
Background grid cell size in pixels.
fwhm : bool or float
FWHM handling. ``False`` (default) — per-object FWHM for output
only. ``True`` — estimate global FWHM and scale *aper*/*bkgann*
(interpreted as FWHM units). Numeric — use this value directly
and scale *aper*/*bkgann*.
optimal : bool
Use optimal extraction (``sep.sum_circle_optimal``) instead of
aperture photometry. Requires FWHM. SEP 1.4+ only.
group_sources : bool
Grouped optimal extraction for overlapping sources.
Default True (recommended). SEP 1.4+ only.
group_radius_factor : float
Connectivity scale for grouped optimal extraction. The default 1.0
groups sources whose apertures overlap.
group_halo_factor : float or None
Local context halo scale for grouped optimal extraction. The default
1.2 keeps a modest expanded solve context without widening source
connectivity. ``None`` lets SEP use ``group_radius_factor``.
centroid : bool
Refine positions via ``sep.winpos``. Default False (windowed
positions can be biased in crowded fields). With SEP 1.4+, uses
``maxstep = 0.2 * FWHM``.
centroid_sigma_scale : float
Multiplicative scale applied to the Gaussian sigma passed to
``sep.winpos``. The default 1.0 keeps the historical behavior
``sigma = FWHM / 2.355``. Values below 1 use a tighter window;
values above 1 use a broader window.
centroid_maxstep_scale : float
Multiplicative scale applied to the SEP 1.4+ ``maxstep`` limit
used by ``sep.winpos``. The default 1.0 keeps the historical
cap ``maxstep = 0.2 * FWHM``. Values below 1 restrict the allowed
centroid motion more strongly; values above 1 loosen it.
centroid_maxshift_scale : float
Multiplicative scale applied to the SEP 1.4+ ``maxshift`` cap on
the *total* centroid drift across all winpos iterations (in units
of FWHM). The default 0.5 means a centroid can move at most
``0.5 * FWHM`` from its initial position, complementing the
per-iteration ``maxstep`` cap. Set to a large value to effectively
disable. Ignored when SEP is older than 1.4 or when FWHM is not
available.
use_mask_large : bool
Filter out objects with footprints larger than *npix_large*.
npix_large : int
Pixel-count threshold for large-object rejection.
subtract_bg : bool
Subtract background before detection (default True).
sn : float
Minimum S/N (``magerr < 1/sn``) for output.
get_segmentation : bool
Also return the segmentation map and add ``seg_id`` column.
deblend_fwhm : float
Fixed Gaussian FWHM (pixels) for deterministic deblending.
0 = adaptive shapes with stochastic assignment. SEP 1.4+ only.
deblend_method : {'watershed', 'threshold'}
Deblending algorithm. SEP 1.4+ only.
clip_sigma : float
Sigma-clipping threshold for annulus background. SEP 1.4+ only.
clip_iters : int
Max sigma-clipping iterations for annulus background.
0 = disable. SEP 1.4+ only.
fwhm_spatial_order : int
When FWHM is auto-estimated (``fwhm=True`` or via the optimal path),
fit a 2-D polynomial ``FWHM(x, y)`` of this order and apply it
per source. ``0`` (default) keeps the historical scalar behaviour.
Per-source values are used for aperture/background-annulus scaling
and for the optimal-extraction PSF width; the windowed centroider
still uses the scalar median. Ignored when a numeric ``fwhm`` is
supplied directly. See :class:`FWHMMap`.
verbose : bool or callable
Print progress messages.
Returns
-------
astropy.table.Table or tuple
Table of detected objects. If *get_segmentation* is True, returns
``(table, segmentation_map)``.
**Columns:** x, y, xerr, yerr, flux, fluxerr, mag, magerr,
flags, ra, dec, bg, fwhm, a, b, theta, and optionally seg_id.
**Metadata:** aper, bkgann, optimal, group_sources, fwhm_phot.
Notes
-----
- FWHM estimation prefers SEP's built-in method (ported from
SExtractor). Falls back to 2nd moments for older SEP versions.
- Optimal extraction assumes a Gaussian PSF.
- Grouped extraction fits nearby sources simultaneously — dramatically
improves accuracy for close pairs.
- Watershed deblending often gives better completeness for pairs
closer than ~2 FWHM.
"""
# Simple Wrapper around print for logging in verbose mode only
log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None
if r0 > 0.0:
kernel = make_kernel(r0)
else:
kernel = None
log("Preparing background mask")
if mask is None:
mask = np.zeros_like(image, dtype=bool)
mask_bg = np.zeros_like(mask)
mask_segm = np.zeros_like(mask)
# Detection mask: non-finite pixels plus optional user-provided detection
# mask. The main user mask is not passed to sep.extract() so that bright
# objects with partially masked cores (e.g. saturated pixels) are detected
# and deblended as single objects rather than being fragmented into
# ring-shaped artifacts. The user mask is still used for background
# estimation and photometry. Use mask_detect to exclude entire regions
# (e.g. bad columns, satellite trails) from detection.
mask_det = ~np.isfinite(image)
if mask_detect is not None:
mask_det = mask_det | mask_detect
log("Building background map")
bg = sep.Background(image, mask=mask | mask_bg, bw=bg_size, bh=bg_size)
if subtract_bg:
image1 = image - bg.back()
else:
image1 = image.copy()
if err is None:
err = bg.rms()
err[~np.isfinite(err)] = 1e30
err[err == 0] = 1e30
sep.set_extract_pixstack(image.shape[0] * image.shape[1])
if use_mask_large:
# Mask regions around huge objects as they are most probably corrupted by saturation and blooming
log("Extracting initial objects")
# Minimal extraction, just to get large objects
obj0, segm = sep.extract(
image1,
err=err,
thresh=thresh,
minarea=minarea,
mask=mask_det | mask_bg,
filter_kernel=kernel,
segmentation_map=True,
)
log("Dilating large objects")
mask_segm = np.isin(
segm, [_ + 1 for _, npix in enumerate(obj0['npix']) if npix > npix_large]
)
mask_segm = dilate(mask_segm, np.ones([10, 10]))
log("Extracting final objects")
extract_kwargs = {
'err': err,
'thresh': thresh,
'minarea': minarea,
'mask': mask_det | mask_bg | mask_segm,
'filter_kernel': kernel,
'segmentation_map': True,
}
# Add deblending parameters if available (SEP 1.4+)
if _HAS_SEP_OPTIMAL:
extract_kwargs['deblend_fwhm'] = deblend_fwhm
extract_kwargs['deblend_method'] = deblend_method
# Merge with any additional kwargs
extract_kwargs.update(kwargs)
obj0, segm = sep.extract(image1, **extract_kwargs)
# Flag objects whose footprints overlap the user mask.
# 0x100 = object has masked pixels within its detection footprint.
if mask.any():
# For each segmentation label, check if any of its pixels are masked
# Use fast vectorized approach: find unique labels that overlap mask
masked_labels = np.unique(segm[mask & (segm > 0)])
# seg_id for object i is i+1 (1-based)
obj_seg_ids = np.arange(1, len(obj0) + 1)
has_masked = np.isin(obj_seg_ids, masked_labels)
obj0['flag'][has_masked] |= 0x100
# Compute half-light radius for all extracted objects. This is done
# early so that estimate_fwhm_from_objects can use it for FWHM
# estimation (flux_radius is more stable than SEP's Gaussian-core FWHM).
_x0 = obj0['x']
_y0 = obj0['y']
_a0 = np.maximum(obj0['a'], 0.5)
_b0 = np.maximum(obj0['b'], 0.5)
_theta0 = obj0['theta']
_kronrad0, _ = sep.kron_radius(
image1,
_x0,
_y0,
_a0,
_b0,
_theta0,
6.0,
mask=mask | mask_bg | mask_segm,
)
_kronrad0 = np.maximum(_kronrad0, 1.0)
_rmax0 = np.clip(relfluxradius * _kronrad0 * np.maximum(_a0, _b0), 3.0, 50.0)
_flux_auto0, _, _ = sep.sum_circle(
image1,
_x0,
_y0,
_rmax0,
mask=mask | mask_bg | mask_segm,
)
_flux_radius0, _ = sep.flux_radius(
image1,
_x0,
_y0,
_rmax0,
0.5,
normflux=_flux_auto0,
subpix=5,
)
_flux_radius0 = np.asarray(_flux_radius0, dtype=float)
# Build a minimal Table from obj0 so estimate_fwhm_from_objects
# can see flux_radius alongside the standard SEP columns.
_obj0_for_fwhm = Table(
{
'fwhm': obj0['fwhm']
if 'fwhm' in obj0.dtype.names
else 2.0 * np.sqrt(np.log(2) * (obj0['a'] ** 2 + obj0['b'] ** 2)),
'flux_radius': _flux_radius0,
'a': obj0['a'],
'b': obj0['b'],
'x': obj0['x'],
'y': obj0['y'],
'flags': obj0['flag'],
}
)
# Handle FWHM parameter. ``fwhm_value`` ends up being either a scalar
# float or a :class:`FWHMMap` callable when ``fwhm_spatial_order >= 1``.
fwhm_value = None
scale_with_fwhm = False
if isinstance(fwhm, (int, float)) and not isinstance(fwhm, bool) and fwhm > 0:
# Numeric FWHM value provided directly
fwhm_value = float(fwhm)
scale_with_fwhm = True
log("Using provided FWHM = %.2g" % fwhm_value)
elif isinstance(fwhm, FWHMMap):
# User-supplied spatial FWHM model
fwhm_value = fwhm
scale_with_fwhm = True
log("Using provided spatial FWHM model (median=%.2g)" % fwhm_value.median)
elif fwhm is True or (optimal and fwhm is False):
# Estimate FWHM from detected objects using robust mode estimator
# (either explicitly requested or needed for optimal extraction)
fwhm_value = estimate_fwhm_from_objects(
_obj0_for_fwhm,
snr_min=None,
use_flags=True,
spatial_order=fwhm_spatial_order,
image_shape=image.shape,
verbose=verbose,
)
# FWHMMap is always "finite" (has a median); np.isfinite rejects it.
if isinstance(fwhm_value, FWHMMap):
pass
elif not np.isfinite(fwhm_value):
# Fall back to plain median of moment-based FWHM for unflagged objects
idx_fwhm = obj0['flag'] == 0
if 'fwhm' in obj0.dtype.names:
fwhm_value = np.nanmedian(obj0['fwhm'][idx_fwhm])
else:
fwhm_value = np.nanmedian(
2.0 * np.sqrt(np.log(2) * (obj0['a'][idx_fwhm] ** 2 + obj0['b'][idx_fwhm] ** 2))
)
log("Robust FWHM estimation failed, falling back to median")
if fwhm is True:
# Scale aperture/annulus when explicitly requested (fwhm=True)
scale_with_fwhm = True
if isinstance(fwhm_value, FWHMMap):
log(
"Estimated spatial FWHM (order=%d, median=%.2g)"
% (fwhm_value.order, fwhm_value.median)
)
else:
log("Estimated FWHM = %.2g" % fwhm_value)
# Stash the FWHM-units base so we can evaluate per-source apertures at
# the photometry call sites when fwhm_value is a FWHMMap.
aper_base = aper
bkgann_base = bkgann
fwhm_is_spatial = isinstance(fwhm_value, FWHMMap)
# Scale aperture and background annulus with FWHM if requested
if scale_with_fwhm and fwhm_value is not None:
if fwhm_is_spatial:
log("Scaling aperture and background annulus per source via spatial FWHM")
log(" Aperture (median): %.2g pixels" % (aper * fwhm_value.median))
if bkgann is not None:
log(
" Background annulus (median): (%.2g, %.2g) pixels"
% (bkgann[0] * fwhm_value.median, bkgann[1] * fwhm_value.median)
)
else:
log("Scaling aperture and background annulus with FWHM")
aper *= fwhm_value
if bkgann is not None:
bkgann = (bkgann[0] * fwhm_value, bkgann[1] * fwhm_value)
log(" Aperture: %.2g pixels" % aper)
if bkgann is not None:
log(" Background annulus: (%.2g, %.2g) pixels" % bkgann)
# Windowed positional parameters - use if requested
if centroid:
log("Refining positions using windowed centroiding")
# sep.winpos takes a scalar sigma/maxstep, so use the median summary
# of the spatial model when one is in play.
fwhm_scalar = float(fwhm_value) if fwhm_value is not None else None
# Constrain winpos to each source's own segmentation footprint.
# Without this, sep.winpos is segmentation-blind and deblended
# sub-objects sharing a flux peak (e.g. saturated cores split into
# fragments) all converge to the same point. Negative seg_id tells
# SEP to mask every pixel except those belonging to that source.
winpos_kwargs = {
'mask': mask | mask_bg | mask_segm,
'segmap': segm if segm.dtype == np.int32 else segm.astype(np.int32),
'seg_id': -np.arange(1, len(obj0) + 1, dtype=np.int32),
}
if _HAS_SEP_OPTIMAL:
# SEP 1.4+ has maxstep parameter to cap position shift per iteration
# Set to 0.2 * FWHM to avoid excessive drift in degenerate cases
if fwhm_scalar is not None:
maxstep = 0.2 * fwhm_scalar * float(centroid_maxstep_scale)
else:
# Default to 1.0 pixel if FWHM not available
maxstep = 1.0 * float(centroid_maxstep_scale)
if not np.isfinite(maxstep) or maxstep <= 0:
raise ValueError("centroid_maxstep_scale should produce a positive finite maxstep")
winpos_kwargs['maxstep'] = maxstep
# SEP 1.4+ also has maxshift parameter to cap the *total* drift
# across all winpos iterations. Belt-and-braces alongside the
# segmap restriction: bounds runaway centroids even when a source
# spans a large segment (e.g. saturated cores with bloom trails).
if fwhm_scalar is not None:
maxshift = fwhm_scalar * float(centroid_maxshift_scale)
if not np.isfinite(maxshift) or maxshift <= 0:
raise ValueError("centroid_maxshift_scale should produce a positive finite maxshift")
winpos_kwargs['maxshift'] = maxshift
sig = fwhm_scalar / 2.355 if fwhm_scalar is not None else 0.5
sig *= float(centroid_sigma_scale)
if not np.isfinite(sig) or sig <= 0:
raise ValueError("centroid_sigma_scale should produce a positive finite sigma")
xwin, ywin, flag_win = sep.winpos(image1, obj0['x'], obj0['y'], sig, **winpos_kwargs)
# Drop sep.APER_HASMASKED: our segmap restriction (seg_id=-N) masks every
# pixel outside each source's footprint, so the winpos aperture always
# overlaps masked pixels by construction. Keep TRUNC / ALLMASKED /
# NONPOSITIVE — those still convey real centroiding-quality info.
flag_win &= ~np.int_(sep.APER_HASMASKED)
else:
# Use initial positions from extraction
xwin, ywin = obj0['x'], obj0['y']
flag_win = np.zeros(len(obj0), dtype=int)
# Filter out objects too close to frame edges
idx = (
(np.round(xwin) > edge)
& (np.round(ywin) > edge)
& (np.round(xwin) < image.shape[1] - edge)
& (np.round(ywin) < image.shape[0] - edge)
) # & (obj0['flag'] == 0)
if minnthresh:
idx &= obj0['tnpix'] >= minnthresh
log("Measuring final objects")
# Build per-source aperture / background annulus / PSF-width arrays when
# a spatial FWHM model is in use. Aperture / bkgann only vary per source
# when the user asked for FWHM-unit scaling; the optimal PSF width follows
# the spatial model whenever one is available.
if fwhm_is_spatial:
_fwhm_pos = fwhm_value(xwin[idx], ywin[idx])
else:
_fwhm_pos = None
if scale_with_fwhm and fwhm_is_spatial:
aper_phot = aper_base * _fwhm_pos
if bkgann_base is not None:
bkgann_phot = (
bkgann_base[0] * _fwhm_pos,
bkgann_base[1] * _fwhm_pos,
)
else:
bkgann_phot = None
else:
aper_phot = aper
bkgann_phot = bkgann
if fwhm_is_spatial:
fwhm_phot_arg = _fwhm_pos
elif fwhm_value is not None:
fwhm_phot_arg = float(fwhm_value)
else:
fwhm_phot_arg = None
# Choose photometry method
if optimal and _HAS_SEP_OPTIMAL:
if group_sources:
log(
"Using grouped optimal extraction "
"(radius_factor=%.2g, halo_factor=%s)"
% (
group_radius_factor,
("SEP default" if group_halo_factor is None else "%.2g" % group_halo_factor),
)
)
else:
log("Using optimal extraction")
flux, fluxerr, flag = sep.sum_circle_optimal(
image1,
xwin[idx],
ywin[idx],
aper_phot,
fwhm=fwhm_phot_arg,
err=err,
gain=gain,
mask=mask | mask_bg | mask_segm,
bkgann=bkgann_phot,
grouped=group_sources,
group_radius_factor=group_radius_factor,
group_halo_factor=group_halo_factor,
clip_sigma=clip_sigma,
clip_iters=clip_iters,
)
else:
if optimal and not _HAS_SEP_OPTIMAL:
log(
"WARNING: Optimal extraction requested but SEP 1.4+ not available, falling back to aperture photometry"
)
flux, fluxerr, flag = sep.sum_circle(
image1,
xwin[idx],
ywin[idx],
aper_phot,
err=err,
gain=gain,
mask=mask | mask_bg | mask_segm,
bkgann=bkgann_phot,
clip_sigma=clip_sigma,
clip_iters=clip_iters,
)
# For debug purposes, let's make also the same aperture photometry on the background map
bgflux, bgfluxerr, bgflag = sep.sum_circle(
bg.back(),
xwin[idx],
ywin[idx],
aper_phot,
err=bg.rms(),
gain=gain,
mask=mask | mask_bg | mask_segm,
clip_sigma=clip_sigma,
clip_iters=clip_iters,
)
bgnorm = bgflux / np.pi / np.asarray(aper_phot) ** 2
# Fluxes to magnitudes
mag, magerr = np.zeros_like(flux), np.zeros_like(flux)
mag[flux > 0] = -2.5 * np.log10(flux[flux > 0])
# magerr[flux>0] = 2.5*np.log10(1.0 + fluxerr[flux>0]/flux[flux>0])
magerr[flux > 0] = 2.5 / np.log(10) * fluxerr[flux > 0] / flux[flux > 0]
# FWHM estimation - prefer SEP's built-in FWHM (ported from SExtractor)
# Fall back to 2nd moments for older SEP versions
if 'fwhm' in obj0.dtype.names:
# Use SEP's built-in FWHM (nearly identical to SExtractor)
fwhm = obj0['fwhm'][idx]
else:
# Fall back to 2nd moments: FWHM = 2 * sqrt(ln(2) * (a^2 + b^2))
# This is exact for Gaussian PSF, approximate for others
# More robust than flux_radius (5-27x fewer outliers) but less accurate than SEP built-in
fwhm = 2.0 * np.sqrt(np.log(2) * (obj0['a'][idx] ** 2 + obj0['b'][idx] ** 2))
# Reuse the half-light radius already computed for all objects
_flux_radius = _flux_radius0[idx]
flag |= obj0['flag'][idx] | flag_win[idx]
# Quality cuts
fidx = (flux > 0) & (magerr < 1.0 / sn)
if wcs is None and header is not None:
# If header is provided, we may build WCS from it
wcs = WCS(header)
if wcs is not None:
# If WCS is provided we may convert x,y to ra,dec
ra, dec = wcs.all_pix2world(obj0['x'][idx], obj0['y'][idx], 0)
else:
ra, dec = np.zeros_like(obj0['x'][idx]), np.zeros_like(obj0['y'][idx])
if verbose:
log("All done")
obj = Table(
{
'x': xwin[idx][fidx],
'y': ywin[idx][fidx],
'xerr': np.sqrt(obj0['errx2'][idx][fidx]),
'yerr': np.sqrt(obj0['erry2'][idx][fidx]),
'flux': flux[fidx],
'fluxerr': fluxerr[fidx],
'mag': mag[fidx],
'magerr': magerr[fidx],
'flags': obj0['flag'][idx][fidx] | flag[fidx],
'ra': ra[fidx],
'dec': dec[fidx],
'bg': bgnorm[fidx],
'fwhm': fwhm[fidx],
'flux_radius': _flux_radius[fidx],
'a': obj0['a'][idx][fidx],
'b': obj0['b'][idx][fidx],
'theta': obj0['theta'][idx][fidx],
}
)
if scale_with_fwhm and fwhm_is_spatial:
obj.meta['aper'] = aper_base * fwhm_value.median
if bkgann_base is not None:
obj.meta['bkgann'] = (
bkgann_base[0] * fwhm_value.median,
bkgann_base[1] * fwhm_value.median,
)
else:
obj.meta['bkgann'] = None
else:
obj.meta['aper'] = aper
obj.meta['bkgann'] = bkgann
obj.meta['optimal'] = optimal and _HAS_SEP_OPTIMAL
obj.meta['group_sources'] = group_sources and optimal and _HAS_SEP_OPTIMAL
if obj.meta['group_sources']:
obj.meta['group_radius_factor'] = group_radius_factor
obj.meta['group_halo_factor'] = group_halo_factor
if fwhm_value is not None:
if fwhm_is_spatial:
obj.meta['fwhm_phot'] = fwhm_value.median
obj.meta['fwhm_phot_model'] = fwhm_value
else:
obj.meta['fwhm_phot'] = fwhm_value
obj.sort('flux', reverse=True)
if get_segmentation:
# Segmentation map ID (sequential object number + 1, 1-based indexing)
seg_id = np.arange(len(obj0['x'])) + 1
obj['seg_id'] = seg_id[idx][fidx]
return obj, segm
else:
return obj
def _empty_table(get_segmentation=False):
"""
Return empty table with correct structure.
Parameters
----------
get_segmentation : bool
If True, return tuple (table, None), otherwise just table
Returns
-------
table : astropy.table.Table
Empty table with standard columns
segm : None (optional)
None segmentation map if get_segmentation=True
"""
obj = Table()
obj['x'] = []
obj['y'] = []
obj['flux'] = []
obj['fluxerr'] = []
obj['mag'] = []
obj['magerr'] = []
obj['fwhm'] = []
obj['a'] = []
obj['b'] = []
obj['theta'] = []
obj['bg'] = []
obj['flags'] = []
obj['xerr'] = []
obj['yerr'] = []
if get_segmentation:
return obj, None
else:
return obj
[docs]
class ConstantBackground:
def __init__(self, value, rms, shape):
self.background = np.full(shape, value)
self.background_rms = np.full(shape, rms)
[docs]
def get_value(val):
"""Get array from either Quantity or ndarray"""
if hasattr(val, 'value'):
return val.value
else:
return np.asarray(val)
[docs]
def get_objects_photutils(
image,
header=None,
mask=None,
mask_detect=None,
err=None,
# Detection parameters
thresh=2.0,
method='segmentation',
deblend=True,
minarea=5,
saturation=None,
# Segmentation-specific parameters
npixels=5,
nlevels=32,
contrast=0.001,
connectivity=8,
# StarFinder-specific parameters
fwhm=3.0,
sharplo=0.2,
sharphi=1.0,
roundlo=-1.0,
roundhi=1.0,
# Photometry parameters
aper=3.0,
bkgann=None,
# Background parameters
bg_size=64,
subtract_bg=True,
# Filtering parameters
edge=0,
sn=3.0,
# Output control
wcs=None,
get_segmentation=False,
verbose=False,
**kwargs,
):
"""
Detect sources in image using photutils detection algorithms.
This function provides an alternative to get_objects_sep() using
photutils instead of SEP. It supports multiple detection methods
including segmentation-based detection with deblending and
point source detection using DAOStarFinder or IRAFStarFinder.
Parameters
----------
image : numpy.ndarray
2D image array
header : astropy.io.fits.Header, optional
FITS header for WCS information
mask : numpy.ndarray, optional
Boolean mask (True = masked pixel). Used for background estimation,
photometry, and flagging (0x100). Not used for detection itself.
mask_detect : numpy.ndarray, optional
Boolean mask for detection (True = exclude from detection). OR-ed with
non-finite pixel mask. If None, only non-finite pixels are excluded
from detection.
err : numpy.ndarray, optional
Error/uncertainty map. If None, uses background RMS
thresh : float, optional
Detection threshold in sigma units (default: 2.0)
method : str, optional
Detection method: 'segmentation', 'dao', or 'iraf' (default: 'segmentation')
deblend : bool, optional
Apply deblending for segmentation method (default: True)
minarea : int, optional
Minimum source area in pixels (default: 5)
saturation : float, optional
Saturation threshold for flagging saturated sources. Sources with
peak pixel values exceeding this threshold will have flag 0x004 set.
If None (default), saturation detection is disabled.
npixels : int, optional
Minimum connected pixels for segmentation (default: 5)
nlevels : int, optional
Number of deblending levels (default: 32)
contrast : float, optional
Deblending contrast threshold (default: 0.001)
connectivity : int, optional
Pixel connectivity (4 or 8) for segmentation (default: 8)
fwhm : float, optional
FWHM for StarFinder methods in pixels (default: 3.0)
sharplo : float, optional
Lower sharpness bound for StarFinder (default: 0.2)
sharphi : float, optional
Upper sharpness bound for StarFinder (default: 1.0)
roundlo : float, optional
Lower roundness bound for StarFinder (default: -1.0)
roundhi : float, optional
Upper roundness bound for StarFinder (default: 1.0)
aper : float, optional
Aperture radius in pixels (default: 3.0)
bkgann : tuple, optional
Background annulus (inner, outer) radii in pixels
bg_size : int, optional
Background estimation box size (default: 64)
subtract_bg : bool, optional
Subtract background before detection (default: True)
edge : int, optional
Edge exclusion in pixels (default: 0)
sn : float, optional
Minimum S/N ratio (default: 3.0)
wcs : astropy.wcs.WCS, optional
WCS for coordinate conversion. If None, extracted from header
get_segmentation : bool, optional
Return segmentation map (only for method='segmentation') (default: False)
verbose : bool or callable, optional
Verbose output (default: False)
**kwargs
Additional parameters passed to photutils functions
Returns
-------
table : astropy.table.Table
Table of detected sources with columns:
- x, y: pixel coordinates (0-indexed)
- xerr, yerr: position errors
- flux, fluxerr: aperture flux and error
- mag, magerr: instrumental magnitude and error
- fwhm: FWHM estimate
- a, b: semi-major and semi-minor axes
- theta: position angle
- bg: local background
- flags: detection flags
- ra, dec: WCS coordinates (if WCS available)
segm : photutils.segmentation.SegmentationImage, optional
Segmentation map (only if get_segmentation=True and method='segmentation')
Notes
-----
The function follows the same workflow as get_objects_sep():
1. Validate inputs and create mask from NaNs
2. Estimate background using Background2D
3. Optionally subtract background
4. Detect sources using selected method
5. Measure aperture photometry
6. Apply quality filters
7. Add WCS coordinates if available
8. Return table with standard columns
Detection methods:
- 'segmentation': Uses detect_sources() with optional deblending.
Best for extended sources and crowded fields.
- 'dao': Uses DAOStarFinder for point sources.
Best for stellar fields with well-defined PSF.
- 'iraf': Uses IRAFStarFinder for IRAF-compatible detection.
Similar to 'dao' but with IRAF-style parameters.
Examples
--------
Basic segmentation detection:
>>> obj = get_objects_photutils(image, thresh=3.0, aper=5.0)
Segmentation with deblending:
>>> obj = get_objects_photutils(
... image, method='segmentation', deblend=True,
... nlevels=32, contrast=0.001
... )
Point source detection:
>>> obj = get_objects_photutils(
... image, method='dao', fwhm=3.0, thresh=5.0
... )
With background annulus:
>>> obj = get_objects_photutils(
... image, aper=5.0, bkgann=(10.0, 15.0)
... )
Get segmentation map:
>>> obj, segm = get_objects_photutils(
... image, method='segmentation', get_segmentation=True
... )
"""
# Validate inputs
if image.ndim != 2:
raise ValueError("Image must be 2D array")
# Setup logging
log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None
# Create detection mask from NaNs and optional mask_detect
mask_det = ~np.isfinite(image)
if mask_detect is not None:
mask_det = mask_det | np.asarray(mask_detect, dtype=bool)
if mask is None:
mask = np.zeros(image.shape, dtype=bool)
else:
mask = np.array(mask).astype(bool)
# Saturation map
if saturation is not None:
log(f'Using saturation level {saturation:.2f}')
smask = image >= saturation
# Let's subtract saturation mask from user-provided one so that it does not prevent proper segmentation
mask = mask & (~smask)
else:
smask = np.zeros(image.shape, dtype=bool)
total_mask = mask | mask_det | smask
# Extract WCS from header if provided
if wcs is None and header is not None:
try:
wcs = WCS(header)
if not wcs.is_celestial:
wcs = None
except Exception:
wcs = None
# Estimate background using photutils
log(f'Estimating background with {bg_size}x{bg_size} grid')
try:
bkg = photutils.background.Background2D(
image,
box_size=bg_size,
mask=total_mask,
bkg_estimator=photutils.background.ModeEstimatorBackground(),
exclude_percentile=10,
)
except Exception as e:
log(f'Warning: Background estimation failed: {e}')
# Fallback to simple median
bkg = ConstantBackground(
np.nanmedian(image[~total_mask]) if np.any(~total_mask) else 0.0,
np.nanstd(image[~total_mask]) if np.any(~total_mask) else 1.0,
image.shape,
)
# These things are not cached internally, so let's avoid re-computing them
bkg_back = bkg.background
bkg_rms = bkg.background_rms
# Optionally subtract background
if subtract_bg:
image_bgsub = image - bkg.background
log(
f'Subtracting background: median {np.nanmedian(bkg_back):.2f}, '
f'rms {np.nanmedian(bkg_rms):.2f}'
)
else:
image_bgsub = image.copy()
# Create or use error map
if err is None:
err = bkg_rms
log('Using background RMS as error map')
else:
log('Using provided error map')
# Initialize variables for different methods
segm = None
catalog = None
sources = None
# Source detection based on different methods
if method == 'segmentation':
# Detect sources via segmentation
threshold = thresh * err
log(f'Detecting sources with threshold {thresh} sigma, at least {npixels} pixels')
segm = photutils.segmentation.detect_sources(
image_bgsub,
threshold=threshold,
npixels=npixels,
connectivity=connectivity,
mask=mask_det,
)
if segm is None or segm.nlabels == 0:
log('No sources detected')
return _empty_table(get_segmentation)
log(f'Detected {segm.nlabels} initial segments')
# Optionally deblend sources
if deblend and segm.nlabels > 0:
log(f'Deblending with nlevels {nlevels}, contrast {contrast}')
try:
segm_deblend = photutils.segmentation.deblend_sources(
image_bgsub, segm, npixels=npixels, nlevels=nlevels, contrast=contrast
)
segm = segm_deblend
log(f'Deblended to {segm.nlabels} sources')
except Exception as e:
log(f'Warning: Deblending failed: {e}')
# Extract source properties
catalog = photutils.segmentation.SourceCatalog(
image_bgsub,
segm,
error=err,
mask=mask_det,
background=bkg_back if subtract_bg else None,
)
# Convert to arrays (handle both Quantity and ndarray)
x = get_value(catalog.xcentroid)
y = get_value(catalog.ycentroid)
area = get_value(catalog.area)
elif method in ['dao', 'iraf']:
# Calculate threshold
threshold_value = thresh * np.nanmedian(err)
# Select appropriate finder
if method == 'dao':
log(f'Using DAOStarFinder with fwhm={fwhm}, threshold={thresh} sigma')
finder = photutils.detection.DAOStarFinder(
fwhm=fwhm,
threshold=threshold_value,
sharplo=sharplo,
sharphi=sharphi,
roundlo=roundlo,
roundhi=roundhi,
exclude_border=True,
)
else: # method == 'iraf'
log(f'Using IRAFStarFinder with fwhm={fwhm}, threshold={thresh} sigma')
finder = photutils.detection.IRAFStarFinder(
fwhm=fwhm,
threshold=threshold_value,
sharplo=sharplo,
sharphi=sharphi,
roundlo=roundlo,
roundhi=roundhi,
exclude_border=True,
)
# Find sources
try:
sources = finder(image_bgsub, mask=mask_det)
except Exception as e:
log(f'Warning: Source detection failed: {e}')
sources = None
if sources is None or len(sources) == 0:
log('No sources detected')
return _empty_table(get_segmentation=False)
log(f'Detected {len(sources)} sources')
# Extract positions (handle both Quantity and ndarray)
x = get_value(sources['xcentroid'])
y = get_value(sources['ycentroid'])
# StarFinder doesn't provide area, use approximate
area = np.full(len(sources), np.pi * (fwhm / 2) ** 2)
else:
raise ValueError(f"Unknown method: {method}. Use 'segmentation', 'dao', or 'iraf'")
# Aperture photometry
log(f'Performing aperture photometry with {aper} pixels aperture')
# Create apertures at detected positions
positions = np.column_stack([x, y])
apertures = photutils.aperture.CircularAperture(positions, r=aper)
# Optional background annulus
if bkgann is not None:
log(f'Using background annulus: {bkgann}')
bkg_apertures = photutils.aperture.CircularAnnulus(
positions, r_in=bkgann[0], r_out=bkgann[1]
)
# Measure background
bkg_phot = photutils.aperture.aperture_photometry(
image_bgsub, bkg_apertures, error=err, mask=mask
)
bkg_mean = bkg_phot['aperture_sum'] / bkg_apertures.area
# Apply background correction to aperture
phot_table = photutils.aperture.aperture_photometry(
image_bgsub, apertures, error=err, mask=mask
)
# Handle both Quantity and ndarray
aper_sum = get_value(phot_table['aperture_sum'])
aper_sum_err = get_value(phot_table['aperture_sum_err'])
bkg_mean_arr = get_value(bkg_mean)
flux = aper_sum - bkg_mean_arr * apertures.area
fluxerr = aper_sum_err
else:
# Simple aperture photometry
phot_table = photutils.aperture.aperture_photometry(
image_bgsub, apertures, error=err, mask=mask
)
# Handle both Quantity and ndarray
flux = get_value(phot_table['aperture_sum'])
fluxerr = get_value(phot_table['aperture_sum_err'])
# Convert to magnitudes
with np.errstate(divide='ignore', invalid='ignore'):
mag = -2.5 * np.log10(flux)
magerr = 2.5 / np.log(10) * fluxerr / flux
# Build output table
obj = Table()
obj['x'] = x
obj['y'] = y
obj['flux'] = flux
obj['fluxerr'] = fluxerr
obj['mag'] = mag
obj['magerr'] = magerr
if segm is not None:
obj['label'] = catalog.labels
# Add shape parameters
if method == 'segmentation':
# Extract from catalog (handle both Quantity and ndarray)
semimajor = get_value(catalog.semimajor_sigma)
semiminor = get_value(catalog.semiminor_sigma)
orientation = get_value(catalog.orientation)
obj['a'] = semimajor * 2.355 # Convert sigma to FWHM
obj['b'] = semiminor * 2.355
obj['theta'] = orientation
obj['fwhm'] = (obj['a'] + obj['b']) / 2
obj['flags'] = np.zeros(len(obj), dtype=int)
# Set detection flags for segmentation method
try:
# 0x002: Deblended sources
if deblend and segm is not None and hasattr(segm, 'deblended_labels'):
deblended_labels = set(segm.deblended_labels)
obj['flags'][np.isin(obj['label'], deblended_labels)] |= 0x002
# 0x008: Footprint truncated at image boundary
bbox_xmin = get_value(catalog.bbox_xmin)
bbox_xmax = get_value(catalog.bbox_xmax)
bbox_ymin = get_value(catalog.bbox_ymin)
bbox_ymax = get_value(catalog.bbox_ymax)
for i in range(len(obj)):
if (
bbox_xmin[i] == 0
or bbox_xmax[i] >= image.shape[1]
or bbox_ymin[i] == 0
or bbox_ymax[i] >= image.shape[0]
):
obj['flags'][i] |= 0x008
# 0x004: Saturated sources (if saturation threshold provided)
if saturation is not None:
max_val = get_value(catalog.max_value)
obj['flags'][max_val >= saturation] |= 0x004
except Exception as e:
log(f'Warning: Flag detection failed for segmentation: {e}')
else: # StarFinder
obj['fwhm'] = np.full(len(obj), fwhm)
obj['a'] = obj['fwhm']
obj['b'] = obj['fwhm']
obj['theta'] = np.zeros(len(obj))
obj['flags'] = np.zeros(len(obj), dtype=int)
# Set detection flags for StarFinder methods
try:
# 0x010: Poor quality metrics (sharpness/roundness near rejection thresholds)
margin = 0.1 # 10% margin from thresholds
if 'sharpness' in sources.colnames:
sharp = get_value(sources['sharpness'])
poor_sharp = (sharp < sharplo * (1 + margin)) | (sharp > sharphi * (1 - margin))
obj['flags'][poor_sharp] |= 0x010
if 'roundness1' in sources.colnames:
round1 = get_value(sources['roundness1'])
poor_round = (round1 < roundlo * (1 + margin)) | (round1 > roundhi * (1 - margin))
obj['flags'][poor_round] |= 0x010
# 0x008: Truncated at boundary (near edge, even though exclude_border=True)
near_edge = (x <= 1) | (x >= image.shape[1] - 2) | (y <= 1) | (y >= image.shape[0] - 2)
obj['flags'][near_edge] |= 0x008
# 0x004: Saturated sources
if saturation is not None and 'peak' in sources.colnames:
peak = get_value(sources['peak'])
obj['flags'][peak >= saturation] |= 0x004
except Exception as e:
log(f'Warning: Flag detection failed for StarFinder: {e}')
# 0x100: User-masked pixels in footprint (both methods)
try:
if np.any(mask):
if method == 'segmentation':
# Use segmentation map
labels = list(set(segm.data[mask])) # footprints with masked pixels
obj['flags'][np.isin(obj['label'], labels)] |= 0x100
else:
# Check source apertures for masked pixels
positions = np.column_stack([obj['x'], obj['y']])
apertures = photutils.aperture.CircularAperture(positions, r=fwhm / 2)
mres = aperture_photometry(mask.astype(int), apertures, method='center')
obj['flags'][mres['aperture_sum'] > 0] |= 0x100
except Exception as e:
log(f'Warning: Masked pixel detection failed: {e}')
# Add background at each source
if method == 'segmentation':
bg_at_centroid = get_value(catalog.background_centroid)
obj['bg'] = bg_at_centroid
else:
obj['bg'] = [
bkg_back[_y, _x]
if _x >= 0 and _x < image.shape[1] and _y >= 0 and _y < image.shape[0]
else np.nan
for _x, _y in zip(obj['x'].astype(int), obj['y'].astype(int))
]
# Add position errors (simple estimate based on S/N)
with np.errstate(divide='ignore', invalid='ignore'):
positional_error = 0.5 / (flux / fluxerr)
positional_error = np.clip(positional_error, 0.1, 5.0)
obj['xerr'] = positional_error
obj['yerr'] = positional_error
# Add metadata
obj.meta['aper'] = aper
if bkgann is not None:
obj.meta['bkgann'] = bkgann
obj.meta['method'] = method
obj.meta['thresh'] = thresh
if method == 'segmentation':
obj.meta['deblend'] = deblend
# Apply quality filters
log(f'Applying quality filters: edge={edge}, sn={sn}, minarea={minarea}')
# Edge filter
if edge > 0:
idx = (obj['x'] > edge) & (obj['x'] < image.shape[1] - edge)
idx &= (obj['y'] > edge) & (obj['y'] < image.shape[0] - edge)
else:
idx = np.ones(len(obj), dtype=bool)
# Area filter (for segmentation)
if method == 'segmentation':
idx &= area >= minarea
# S/N filter
if sn > 0:
with np.errstate(divide='ignore', invalid='ignore'):
obj_sn = flux / fluxerr
idx &= obj_sn > sn
# Positive flux filter
idx &= flux > 0
# Apply filters
n_before = len(obj)
obj = obj[idx]
# Update segmentation map if needed
if method == 'segmentation' and get_segmentation and segm is not None:
# Filter segmentation to keep only selected sources
# Note: This keeps the original segmentation but user can cross-reference with obj table
pass
log(f'Filtered {n_before} -> {len(obj)} sources')
# Sort by brightness (flux, descending)
if len(obj) > 0:
obj.sort('flux', reverse=True)
# Add RA/Dec if WCS available
if wcs is not None and wcs.has_celestial:
try:
coords = wcs.all_pix2world(obj['x'], obj['y'], 0)
obj['ra'] = coords[0]
obj['dec'] = coords[1]
log('Added WCS coordinates (RA, Dec)')
except Exception as e:
log(f'Warning: WCS conversion failed: {e}')
log(f'Returning {len(obj)} sources')
# Return results
if get_segmentation:
if method == 'segmentation':
return obj, segm.data
else:
log('Warning: get_segmentation=True but method is not segmentation')
return obj, None
else:
return obj