"""
Routines for PSF photometry using photutils.
This module provides PSF fitting photometry as an alternative to aperture
photometry, which is more accurate for point sources especially in crowded
fields or when PSF wings are significant.
"""
import numpy as np
from astropy.table import Table
from astropy.nddata import NDData
from astropy.stats import sigma_clipped_stats
import photutils
import photutils.background
import photutils.psf
from photutils.utils import calc_total_error
from . import photometry as phot
from . import psf as psf_module
# Re-export for backward compatibility
from .psf import create_psf_model
def _odd_int(value, min_value=1):
value = int(np.round(value))
if value < min_value:
value = min_value
if value % 2 == 0:
value += 1
return value
def _compute_oversampling(psf_sampling):
if psf_sampling is None or not np.isfinite(psf_sampling) or psf_sampling <= 0:
return 1
if psf_sampling >= 1.0:
return 1
return max(1, int(np.rint(1.0 / psf_sampling)))
def _compute_native_psf_size(psf_height, psf_sampling):
if psf_sampling is None or not np.isfinite(psf_sampling) or psf_sampling <= 0:
size = psf_height
elif psf_sampling >= 1.0:
size = psf_height
else:
size = psf_height * psf_sampling
return _odd_int(size)
def _scale_psf_image_for_photutils(psf_image, oversampling):
if oversampling is None:
return psf_image
factor = float(oversampling) ** 2
if factor <= 1.0:
return psf_image
return psf_image * factor
def _build_circular_fit_mask(shape, x_positions, y_positions, radius):
mask_fit = np.ones(shape, dtype=bool)
r2 = radius**2
for x, y in zip(x_positions, y_positions):
if not np.isfinite(x) or not np.isfinite(y):
continue
x0 = int(np.floor(x - radius))
x1 = int(np.ceil(x + radius)) + 1
y0 = int(np.floor(y - radius))
y1 = int(np.ceil(y + radius)) + 1
x0 = max(0, x0)
y0 = max(0, y0)
x1 = min(shape[1], x1)
y1 = min(shape[0], y1)
if x1 <= x0 or y1 <= y0:
continue
yy, xx = np.mgrid[y0:y1, x0:x1]
inside = (xx - x) ** 2 + (yy - y) ** 2 <= r2
sub = mask_fit[y0:y1, x0:x1]
sub[inside] = False
return mask_fit
[docs]
class GradientLocalBackground(photutils.background.LocalBackground):
"""
Local background estimator using gradient fitting with sigma-clipping.
Inherits from photutils.background.LocalBackground but overrides the estimation
method to fit polynomial gradients instead of taking mean/median.
Instead of taking mean/median of annulus (assumes flat background),
fits a polynomial model to the annulus and evaluates at source position.
Includes sigma-clipping to reject outliers (contaminating sources).
This dramatically reduces biases with background gradients:
- Linear gradients: ~20× improvement (19% → <1% error)
- Quadratic gradients: ~100-400× improvement (-415% → <5% error)
- Sigma-clipping provides robustness in crowded fields
Parameters
----------
inner_radius : float
Inner radius of annulus in pixels
outer_radius : float
Outer radius of annulus in pixels
order : int, optional
Polynomial order:
0 = constant (mean, equivalent to standard LocalBackground)
1 = plane (linear gradient, recommended)
2 = quadratic surface (complex gradients)
Default is 1.
sigma : float, optional
Sigma threshold for sigma-clipping outliers. Default is 3.0.
Higher values are more permissive, lower values reject more outliers.
maxiters : int, optional
Maximum number of sigma-clipping iterations. Default is 3.
"""
def __init__(self, inner_radius, outer_radius, order=1, sigma=3.0, maxiters=3):
# Initialize parent class with dummy bkg_estimator (we'll override __call__)
super().__init__(inner_radius, outer_radius, bkg_estimator=None)
self.order = order
self.sigma = sigma
self.maxiters = maxiters
def __call__(self, data, x, y, mask=None):
"""
Estimate local background at position(s) (x, y).
Parameters
----------
data : 2D ndarray
Image data
x, y : float or array-like
Source position(s)
mask : 2D bool ndarray, optional
Mask (True = masked)
Returns
-------
bg : float or ndarray
Background value(s) at source position(s)
"""
# Handle scalar vs array input
x = np.atleast_1d(x)
y = np.atleast_1d(y)
scalar_input = len(x) == 1
if mask is None:
mask = np.zeros_like(data, dtype=bool)
bg_values = np.zeros(len(x))
size_y, size_x = data.shape
for i, (xi, yi) in enumerate(zip(x, y)):
# Define bounding box for annulus region (OPTIMIZATION: avoid full-image arrays)
r_outer = self.outer_radius
x0 = max(0, int(xi - r_outer) - 1)
x1 = min(size_x, int(xi + r_outer) + 2)
y0 = max(0, int(yi - r_outer) - 1)
y1 = min(size_y, int(yi + r_outer) + 2)
# Create coordinate grids ONLY for bounding box (not full image)
yy, xx = np.mgrid[y0:y1, x0:x1]
# Distance from source (only compute for bounding box pixels)
dist = np.sqrt((xx - xi) ** 2 + (yy - yi) ** 2)
# Annulus mask
annulus_mask = (dist >= self.inner_radius) & (dist <= self.outer_radius)
if mask is not None:
annulus_mask &= ~mask[y0:y1, x0:x1]
if not np.any(annulus_mask):
# Fallback: return median of unmasked data
bg_values[i] = np.median(data[~mask]) if np.any(~mask) else 0.0
continue
# Get annulus data (extract from bounding box)
x_annulus = xx[annulus_mask].ravel()
y_annulus = yy[annulus_mask].ravel()
data_bbox = data[y0:y1, x0:x1]
z_annulus = data_bbox[annulus_mask].ravel()
# Remove NaN/Inf
valid = np.isfinite(z_annulus)
x_annulus = x_annulus[valid]
y_annulus = y_annulus[valid]
z_annulus = z_annulus[valid]
if len(z_annulus) < max(10, (self.order + 1) * (self.order + 2) // 2):
# Not enough points, fallback to mean
bg_values[i] = np.mean(z_annulus) if len(z_annulus) > 0 else 0.0
continue
# Sigma-clipping to reject outliers (contaminating sources)
# Iteratively fit, compute residuals, reject outliers, refit
good_mask = np.ones(len(z_annulus), dtype=bool)
for iteration in range(self.maxiters):
x_good = x_annulus[good_mask]
y_good = y_annulus[good_mask]
z_good = z_annulus[good_mask]
if len(z_good) < max(10, (self.order + 1) * (self.order + 2) // 2):
# Too many rejections, stop
break
# Fit current good points
if self.order == 0:
# Constant (mean)
bg_fit = np.mean(z_good)
residuals = z_good - bg_fit
elif self.order == 1:
# Plane: z = a + b*(x-x0) + c*(y-y0)
dx = x_good - xi
dy = y_good - yi
A = np.column_stack([np.ones_like(x_good), dx, dy])
try:
coeffs = np.linalg.lstsq(A, z_good, rcond=None)[0]
residuals = z_good - (coeffs[0] + coeffs[1] * dx + coeffs[2] * dy)
except np.linalg.LinAlgError:
bg_fit = np.mean(z_good)
residuals = z_good - bg_fit
elif self.order == 2:
# Quadratic: z = a + b*dx + c*dy + d*dx^2 + e*dy^2 + f*dx*dy
dx = x_good - xi
dy = y_good - yi
A = np.column_stack([np.ones_like(x_good), dx, dy, dx**2, dy**2, dx * dy])
try:
coeffs = np.linalg.lstsq(A, z_good, rcond=None)[0]
residuals = z_good - (
coeffs[0]
+ coeffs[1] * dx
+ coeffs[2] * dy
+ coeffs[3] * dx**2
+ coeffs[4] * dy**2
+ coeffs[5] * dx * dy
)
except np.linalg.LinAlgError:
bg_fit = np.mean(z_good)
residuals = z_good - bg_fit
else:
raise ValueError(f"order={self.order} not supported. Use 0, 1, or 2.")
# Compute sigma from residuals
sigma_residuals = np.std(residuals)
if sigma_residuals == 0:
# Perfect fit or constant values, stop
break
# Find outliers in the FULL dataset (not just current good points)
# Compute residuals for all points using current fit
if self.order == 0:
all_residuals = z_annulus[good_mask] - bg_fit
elif self.order == 1:
dx_all = x_annulus[good_mask] - xi
dy_all = y_annulus[good_mask] - yi
all_residuals = z_annulus[good_mask] - (
coeffs[0] + coeffs[1] * dx_all + coeffs[2] * dy_all
)
elif self.order == 2:
dx_all = x_annulus[good_mask] - xi
dy_all = y_annulus[good_mask] - yi
all_residuals = z_annulus[good_mask] - (
coeffs[0]
+ coeffs[1] * dx_all
+ coeffs[2] * dy_all
+ coeffs[3] * dx_all**2
+ coeffs[4] * dy_all**2
+ coeffs[5] * dx_all * dy_all
)
# Reject outliers beyond sigma threshold
outliers = np.abs(all_residuals) > self.sigma * sigma_residuals
if not np.any(outliers):
# No more outliers, converged
break
# Update mask - create new mask relative to original good_mask
good_indices = np.where(good_mask)[0]
good_mask[good_indices[outliers]] = False
# Final fit with cleaned data
x_final = x_annulus[good_mask]
y_final = y_annulus[good_mask]
z_final = z_annulus[good_mask]
if len(z_final) < max(10, (self.order + 1) * (self.order + 2) // 2):
# Sigma-clipping rejected too many points, use all data
x_final = x_annulus
y_final = y_annulus
z_final = z_annulus
# Fit gradient with cleaned data
if self.order == 0:
# Constant (mean)
bg_values[i] = np.mean(z_final)
elif self.order == 1:
# Plane: z = a + b*(x-x0) + c*(y-y0)
dx = x_final - xi
dy = y_final - yi
A = np.column_stack([np.ones_like(x_final), dx, dy])
try:
coeffs = np.linalg.lstsq(A, z_final, rcond=None)[0]
bg_values[i] = coeffs[0] # Value at source position
except np.linalg.LinAlgError:
bg_values[i] = np.mean(z_final)
elif self.order == 2:
# Quadratic: z = a + b*dx + c*dy + d*dx^2 + e*dy^2 + f*dx*dy
dx = x_final - xi
dy = y_final - yi
A = np.column_stack([np.ones_like(x_final), dx, dy, dx**2, dy**2, dx * dy])
try:
coeffs = np.linalg.lstsq(A, z_final, rcond=None)[0]
bg_values[i] = coeffs[0] # Value at source position
except np.linalg.LinAlgError:
bg_values[i] = np.mean(z_final)
else:
raise ValueError(f"order={self.order} not supported. Use 0, 1, or 2.")
return bg_values[0] if scalar_input else bg_values
def __repr__(self):
return (
f"GradientLocalBackground(inner_radius={self.inner_radius}, "
f"outer_radius={self.outer_radius}, order={self.order}, "
f"sigma={self.sigma}, maxiters={self.maxiters})"
)
[docs]
def measure_objects_psf(
obj,
image,
psf=None,
psf_size=None,
fwhm=None,
mask=None,
bg=None,
err=None,
gain=None,
bg_size=64,
bkgann=None,
bkg_order=1,
sn=None,
fit_shape='circular',
fit_size=None,
maxiters=3,
recentroid=True,
keep_negative=True,
get_bg=False,
use_position_dependent_psf=False,
group_sources=True,
grouper_radius=None,
verbose=False,
):
"""PSF photometry at the positions of already detected objects using photutils.
Performs PSF fitting photometry which is more accurate than aperture photometry,
especially for point sources in crowded fields or when accurate flux measurement
of PSF wings is important.
This function will estimate and subtract the background unless external background
estimation (`bg`) is provided, and use user-provided noise map (`err`) if requested.
If a PSF model is not provided, a simple Gaussian PSF will be constructed based on
the `fwhm` parameter or estimated from the data.
Parameters
----------
obj : `~astropy.table.Table`
Table with initial object detections to be measured. Must have 'x' and
'y' columns.
image : `~numpy.ndarray`
Input image as a 2D NumPy array.
psf : photutils PSF model, dict, or None, optional
PSF model to use. Can be a photutils PSF model (e.g.,
IntegratedGaussianPRF, FittableImageModel), a PSFEx PSF structure from
:func:`stdpipe.psf.run_psfex`, or None (will create Gaussian PSF based
on fwhm).
psf_size : int or None, optional
Size of the PSF model in pixels. If None, will be estimated from PSF or
set to 5*fwhm.
fwhm : float or None, optional
Full width at half maximum in pixels. Used if PSF model is not provided,
or to estimate psf_size. If None, will be estimated from obj['fwhm'] if
available.
mask : `~numpy.ndarray` or None, optional
Image mask as a boolean array (True values will be masked).
bg : `~numpy.ndarray` or None, optional
If provided, use this background (same shape as input image) instead of
automatically computed one.
err : `~numpy.ndarray` or None, optional
Image noise map to be used instead of automatically computed one.
gain : float or None, optional
Image gain in e-/ADU, used to build image noise model.
bg_size : int, optional
Background grid size in pixels.
bkgann : list of float or None, optional
Background annulus for local background estimation, [inner_radius,
outer_radius] in pixels. If None, no local background subtraction is
performed (relies only on global Background2D subtraction). If set, uses
gradient-aware local background fitting to handle non-uniform
backgrounds. Note: radii are NOT scaled by FWHM (unlike
measure_objects).
bkg_order : int, optional
Polynomial order for local background fitting. 0 = constant (mean),
1 = plane (linear gradient, recommended), 2 = quadratic surface. Only
used if bkgann is set.
sn : float or None, optional
Minimal S/N ratio for the object to be considered good. If set, all
measurements with magnitude errors exceeding 1/sn will be discarded.
fit_shape : str, optional
Shape of fitting region. Options: 'circular' (default), 'square'.
Determines the aperture used for PSF fitting.
fit_size : int or None, optional
Size of fitting region in pixels. If None, defaults to psf_size.
maxiters : int, optional
Maximum number of iterations for PSF fitting.
recentroid : bool, optional
If True, allow PSF position to vary during fitting (recommended).
keep_negative : bool, optional
If False, measurements with negative fluxes will be discarded.
get_bg : bool, optional
If True, the routine will also return estimated background and
background noise images.
use_position_dependent_psf : bool, optional
If True and PSF is a PSFEx model, use polynomial evaluation for
position-dependent PSF (evaluates PSF at each source position).
group_sources : bool, optional
If True, use grouped PSF fitting for overlapping sources. Fits nearby
sources simultaneously for better accuracy in crowded fields.
grouper_radius : float or None, optional
Radius in pixels for grouping nearby sources. If None, defaults to
2*psf_size. Only used if group_sources is True.
verbose : bool or callable, optional
Whether to show verbose messages during the run. May be either boolean,
or a ``print``-like function.
Returns
-------
result : `~astropy.table.Table` or tuple
Copy of original table with ``flux``, ``fluxerr``, ``mag``, ``magerr``,
``x_psf``, ``y_psf`` columns from PSF fitting. Also includes quality of
fit columns: ``qfit_psf`` (fit quality, 0=good), ``cfit_psf`` (central
pixel fit quality), ``flags_psf`` (photutils fit flags), ``npix_psf``
(number of unmasked pixels used in fit), and ``reduced_chi2_psf``
(reduced chi-squared, available in photutils >= 2.3.0). If `get_bg` is
True, returns a tuple of (table, background, background_error).
"""
# Simple wrapper around print for logging in verbose mode only
log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None
if not len(obj):
log('No objects to measure')
return obj
# Operate on the copy of the table
obj = obj.copy()
from .photometry_measure import (
_prepare_image_and_mask,
_extract_valid_positions,
_compute_magnitudes_and_filter,
)
image1, mask0, mask = _prepare_image_and_mask(image, mask)
# Background estimation
if bg is None or err is None or get_bg:
log('Estimating global background with %dx%d mesh' % (bg_size, bg_size))
bg_est = photutils.background.Background2D(
image1, bg_size, mask=mask | mask0, exclude_percentile=90
)
bg_est_bg = bg_est.background
bg_est_rms = bg_est.background_rms
else:
bg_est = None
if bg is None:
log(
'Subtracting global background: median %.1f rms %.2f'
% (np.median(bg_est_bg), np.std(bg_est_bg))
)
image1 -= bg_est_bg
else:
log(
'Subtracting user-provided background: median %.1f rms %.2f'
% (np.median(bg), np.std(bg))
)
image1 -= bg
image1[mask0] = 0
# Error map
if err is None:
log(
'Using global background noise map: median %.1f rms %.2f + gain %.1f'
% (
np.median(bg_est_rms),
np.std(bg_est_rms),
gain if gain else np.inf,
)
)
err = bg_est_rms
if gain:
err = calc_total_error(image1, err, gain)
else:
log('Using user-provided noise map: median %.1f rms %.2f' % (np.median(err), np.std(err)))
# Estimate FWHM if not provided
if fwhm is None:
if 'fwhm' in obj.colnames:
# Use median FWHM from detections
fwhm_vals = obj['fwhm'][np.isfinite(obj['fwhm'])]
if len(fwhm_vals) > 0:
fwhm = np.median(fwhm_vals)
log('Using median FWHM from detections: %.2f pixels' % fwhm)
else:
fwhm = 3.0
log('No valid FWHM values in detections, using default: %.2f pixels' % fwhm)
else:
fwhm = 3.0
log('FWHM not provided and not in object table, using default: %.2f pixels' % fwhm)
# Create or process PSF model
psf_is_position_dependent = False # Track if PSF varies with position
if psf is None:
# Create a simple Gaussian PSF
sigma = fwhm / (2 * np.sqrt(2 * np.log(2))) # Convert FWHM to sigma
log('Creating Gaussian PSF model with sigma=%.2f pixels (FWHM=%.2f)' % (sigma, fwhm))
# Use CircularGaussianSigmaPRF (replaces deprecated IntegratedGaussianPRF)
psf_model = photutils.psf.CircularGaussianSigmaPRF(sigma=sigma)
if psf_size is None:
psf_size = _odd_int(5 * fwhm)
elif isinstance(psf, dict) and 'data' in psf and 'sampling' in psf:
# PSFEx-like dict structure (from run_psfex, load_psf, or create_psf_model)
psf_data = psf['data']
psf_sampling = psf['sampling']
psf_degree = psf.get('degree', 0)
if use_position_dependent_psf and psf_degree > 0:
log('Using position-dependent PSFEx PSF model (degree=%d)' % psf_degree)
# Store the PSFEx model for later use
# We'll handle position-dependent photometry specially
psf_model = psf # Keep original PSFEx dict
psf_is_position_dependent = True
if psf_size is None:
psf_size = _compute_native_psf_size(psf['height'], psf_sampling)
else:
log('Using PSFEx/ePSF PSF model (constant across field)')
# Get PSF stamp at center position (0,0 works for degree=0)
psf_image = psf_module.get_supersampled_psf_stamp(psf, x=0, y=0, normalize=True)
# Handle oversampling if needed
oversampling = _compute_oversampling(psf_sampling)
psf_image = _scale_psf_image_for_photutils(psf_image, oversampling)
psf_model = photutils.psf.ImagePSF(psf_image, oversampling=oversampling)
psf_is_position_dependent = False
if psf_size is None:
psf_size = _compute_native_psf_size(psf_image.shape[0], psf_sampling)
elif isinstance(psf, (photutils.psf.ImagePSF, photutils.psf.FittableImageModel)):
# Already a photutils PSF model (ImagePSF or legacy FittableImageModel)
log('Using provided photutils ImagePSF model')
psf_model = psf
if psf_size is None:
psf_size = _odd_int(psf.data.shape[0])
elif hasattr(psf, 'fwhm'):
# Photutils ePSF or similar
log('Using provided photutils PSF model with FWHM')
psf_model = psf
if psf_size is None:
psf_size = (
_odd_int(psf.data.shape[0]) if hasattr(psf, 'data') else _odd_int(5 * psf.fwhm)
)
else:
# Assume it's a photutils PSF model
log('Using provided PSF model')
psf_model = psf
if psf_size is None:
psf_size = _odd_int(5 * fwhm)
log('Using PSF size: %d pixels' % psf_size)
# Fitting region size
if fit_size is None:
fit_size = psf_size
fit_size = _odd_int(fit_size)
log('Using fitting region size: %d pixels' % fit_size)
# Prepare initial positions table
x_vals, y_vals, valid_pos = _extract_valid_positions(obj)
init_params = Table()
init_params['x'] = x_vals
init_params['y'] = y_vals
# Add initial flux guesses if available
if 'flux' in obj.colnames:
init_params['flux'] = np.ma.filled(np.asarray(obj['flux']), fill_value=np.nan)
else:
# Estimate initial flux from image at positions
init_params['flux'] = 1000.0 # Default initial guess
if fit_shape not in ['circular', 'square']:
raise ValueError("fit_shape must be 'circular' or 'square'")
fit_mask = None
if fit_shape == 'circular' and np.any(valid_pos):
fit_mask = _build_circular_fit_mask(
image1.shape,
init_params['x'][valid_pos],
init_params['y'][valid_pos],
radius=fit_size / 2,
)
# Import fitting class
from astropy.modeling.fitting import LevMarLSQFitter
# Configure grouping if requested
grouper = None
if group_sources:
if grouper_radius is None:
grouper_radius = 2 * psf_size
log('Using grouped PSF fitting with grouper radius %.1f pixels' % grouper_radius)
grouper = photutils.psf.SourceGrouper(min_separation=grouper_radius)
# Check for invalid positions (from masked columns)
n_invalid = np.sum(~valid_pos)
if n_invalid > 0:
log('Found %d objects with invalid (masked/NaN) positions, will be skipped' % n_invalid)
mask_for_fit = mask | mask0
xy_bounds = None if recentroid else 1e-6
# Perform PSF photometry
log('Performing PSF photometry on %d objects (%d valid)' % (len(obj), np.sum(valid_pos)))
log(
'Settings: %d iterations, recentroid=%s, grouped=%s, position_dependent=%s'
% (maxiters, recentroid, group_sources, psf_is_position_dependent)
)
# Handle position-dependent PSF separately
if psf_is_position_dependent:
log('Performing position-dependent PSF photometry (iterative mode)')
# Initialize output columns
obj['flux'] = np.nan
obj['fluxerr'] = np.nan
obj['x_psf'] = obj['x']
obj['y_psf'] = obj['y']
obj['qfit_psf'] = np.nan
obj['cfit_psf'] = np.nan
obj['flags_psf'] = 0
obj['npix_psf'] = 0
obj['reduced_chi2_psf'] = np.nan
if 'flags' not in obj.keys():
obj['flags'] = 0
# Get sampling (psf_model is always dict at this point)
psf_sampling = psf_model['sampling']
oversampling = _compute_oversampling(psf_sampling)
# Process objects individually or in small groups
# For each object, evaluate PSF at its position
for i in range(len(obj)):
# Skip invalid positions (masked/NaN)
if not valid_pos[i]:
obj['flux'][i] = np.nan
obj['fluxerr'][i] = np.nan
obj['flags'][i] |= 0x1000
continue
try:
# Get object position
x_pos = float(init_params['x'][i])
y_pos = float(init_params['y'][i])
# Evaluate PSF at this position using dict-based PSF
psf_image = psf_module.get_supersampled_psf_stamp(
psf_model, x=x_pos, y=y_pos, normalize=True
)
psf_image = _scale_psf_image_for_photutils(psf_image, oversampling)
# Create photutils PSF model for this position
psf_at_pos = photutils.psf.ImagePSF(psf_image, oversampling=oversampling)
# Set up local background estimator if requested
localbkg_estimator = None
if bkgann is not None and len(bkgann) == 2:
localbkg_estimator = GradientLocalBackground(
bkgann[0], bkgann[1], order=bkg_order
)
# Set up photometry for this object
phot_single = photutils.psf.PSFPhotometry(
psf_model=psf_at_pos,
fit_shape=fit_size,
finder=None,
grouper=grouper,
fitter=LevMarLSQFitter(),
fitter_maxiters=maxiters,
xy_bounds=xy_bounds,
aperture_radius=fit_size / 2,
localbkg_estimator=localbkg_estimator,
)
# Measure this object
init_single = Table()
init_single['x'] = [obj['x'][i]]
init_single['y'] = [obj['y'][i]]
if 'flux' in init_params.colnames:
init_single['flux'] = [init_params['flux'][i]]
else:
init_single['flux'] = [1000.0]
result_single = phot_single(
image1, mask=mask_for_fit, error=err, init_params=init_single
)
# Extract results
obj['flux'][i] = result_single['flux_fit'][0]
obj['fluxerr'][i] = result_single['flux_err'][0]
obj['x_psf'][i] = result_single['x_fit'][0]
obj['y_psf'][i] = result_single['y_fit'][0]
# Extract quality of fit columns if available
if 'qfit' in result_single.colnames:
obj['qfit_psf'][i] = result_single['qfit'][0]
if 'cfit' in result_single.colnames:
obj['cfit_psf'][i] = result_single['cfit'][0]
if 'flags' in result_single.colnames:
obj['flags_psf'][i] = result_single['flags'][0]
if 'npixfit' in result_single.colnames:
obj['npix_psf'][i] = result_single['npixfit'][0]
if 'reduced_chi2' in result_single.colnames:
obj['reduced_chi2_psf'][i] = result_single['reduced_chi2'][0]
# Flag if fit failed
if not np.isfinite(obj['flux'][i]):
obj['flags'][i] |= 0x1000
# Also flag if fit didn't converge or returned input unchanged
elif 'flags' in result_single.colnames:
# Check bit 0 (convergence failure)
bit0_set = (result_single['flags'][0] & 1) != 0
# Check for exact match with input when photutils claims it converged
# (bit 0 NOT set). This catches cases where photutils returns input
# unchanged but doesn't set bit 0.
converged_but_unchanged = (
(result_single['flags'][0] & 1) == 0
and obj['flux'][i] == init_single['flux'][0]
and obj['x_psf'][i] == init_single['x'][0]
and obj['y_psf'][i] == init_single['y'][0]
)
if bit0_set or converged_but_unchanged:
log(
'Warning: PSF fit did not converge or returned unchanged parameters for object %d, setting flux to NaN'
% i
)
obj['flux'][i] = np.nan
obj['fluxerr'][i] = np.nan
obj['flags'][i] |= 0x1000
# Flag if position moved significantly
if recentroid:
if (
np.sqrt(
(obj['x_psf'][i] - obj['x'][i]) ** 2
+ (obj['y_psf'][i] - obj['y'][i]) ** 2
)
> 1.0
):
obj['flags'][i] |= 0x2000
except Exception as e:
log('PSF photometry failed for object %d: %s' % (i, str(e)))
obj['flux'][i] = np.nan
obj['fluxerr'][i] = np.nan
obj['flags'][i] |= 0x1000
else:
# Standard (non-position-dependent) PSF photometry
# Initialize output columns with NaN (for invalid positions)
obj['flux'] = np.nan
obj['fluxerr'] = np.nan
obj['x_psf'] = np.ma.filled(np.asarray(obj['x']), fill_value=np.nan)
obj['y_psf'] = np.ma.filled(np.asarray(obj['y']), fill_value=np.nan)
obj['qfit_psf'] = np.nan
obj['cfit_psf'] = np.nan
obj['flags_psf'] = 0
obj['npix_psf'] = 0
obj['reduced_chi2_psf'] = np.nan
if 'flags' not in obj.keys():
obj['flags'] = 0
# Mark invalid positions as failed
obj['flags'][~valid_pos] |= 0x1000
# Only proceed if there are valid positions
if np.sum(valid_pos) > 0:
try:
# Filter init_params to valid positions only
init_params_valid = init_params[valid_pos]
# Set up local background estimator if requested
localbkg_estimator = None
if bkgann is not None and len(bkgann) == 2:
inner_rad = bkgann[0]
outer_rad = bkgann[1]
order_names = {0: 'constant (mean)', 1: 'plane', 2: 'quadratic'}
order_name = order_names.get(bkg_order, f'order-{bkg_order}')
log(
'Using local background annulus %.1f-%.1f pixels with %s fitting'
% (inner_rad, outer_rad, order_name)
)
# Create gradient-aware local background estimator
localbkg_estimator = GradientLocalBackground(
inner_rad, outer_rad, order=bkg_order
)
# Set up photometry object
phot_obj = photutils.psf.PSFPhotometry(
psf_model=psf_model,
fit_shape=fit_size,
finder=None, # We already have positions
grouper=grouper, # Group nearby sources if requested
fitter=LevMarLSQFitter(), # Levenberg-Marquardt fitter from astropy
fitter_maxiters=maxiters,
xy_bounds=xy_bounds,
aperture_radius=fit_size / 2,
localbkg_estimator=localbkg_estimator,
)
# Do the photometry - photutils 2.x API
result = phot_obj(
image1, mask=mask_for_fit, error=err, init_params=init_params_valid
)
# Map results back to full array
obj['flux'][valid_pos] = result['flux_fit']
obj['fluxerr'][valid_pos] = result['flux_err']
obj['x_psf'][valid_pos] = result['x_fit']
obj['y_psf'][valid_pos] = result['y_fit']
# Extract quality of fit columns if available
if 'qfit' in result.colnames:
obj['qfit_psf'][valid_pos] = result['qfit']
if 'cfit' in result.colnames:
obj['cfit_psf'][valid_pos] = result['cfit']
if 'flags' in result.colnames:
obj['flags_psf'][valid_pos] = result['flags']
if 'npixfit' in result.colnames:
obj['npix_psf'][valid_pos] = result['npixfit']
if 'reduced_chi2' in result.colnames:
# Available in photutils >= 2.3.0
obj['reduced_chi2_psf'][valid_pos] = result['reduced_chi2']
# Flag objects where fit failed (NaN values)
bad_idx = valid_pos & ~np.isfinite(obj['flux'])
obj['flags'][bad_idx] |= 0x1000 # PSF fit failed
# Flag objects where position moved significantly (>1 pixel)
if recentroid:
moved_idx = valid_pos & (
np.sqrt(
(obj['x_psf'] - init_params['x']) ** 2
+ (obj['y_psf'] - init_params['y']) ** 2
)
> 1.0
)
obj['flags'][moved_idx] |= 0x2000 # Large centroid shift
# Flag fits that didn't converge (photutils returns initial guess unchanged)
# Check for flags_psf bit 0 (convergence failure) or exact match with input
if 'flags_psf' in obj.colnames and 'flux' in init_params.colnames:
# Photutils flags: bit 0 = fit did not converge
# When fit doesn't converge, photutils returns initial parameters unchanged
unconverged = valid_pos & ((obj['flags_psf'] & 1) != 0)
# Also check for exact byte-level match between input and output when photutils
# claims the fit converged (bit 0 NOT set). This catches cases where photutils
# returns input unchanged but doesn't set bit 0.
converged_but_unchanged = (
valid_pos
& ((obj['flags_psf'] & 1) == 0)
& (obj['flux'] == init_params['flux'])
& (obj['x_psf'] == init_params['x'])
& (obj['y_psf'] == init_params['y'])
)
# Combine both conditions
failed = unconverged | converged_but_unchanged
if np.sum(failed) > 0:
log(
'Warning: %d PSF fits failed or returned unchanged parameters, setting flux to NaN'
% np.sum(failed)
)
obj['flux'][failed] = np.nan
obj['fluxerr'][failed] = np.nan
obj['flags'][failed] |= 0x1000 # PSF fit failed
except Exception as e:
log('PSF photometry failed: %s' % str(e))
log('Falling back to NaN values')
obj['flags'][valid_pos] |= 0x1000
obj = _compute_magnitudes_and_filter(obj, sn, keep_negative, log)
log('PSF photometry complete: %d objects measured' % len(obj))
if get_bg:
return obj, bg_est_bg, err
else:
return obj