"""
Aperture photometry using PyRAF DAOPHOT routines.
This module provides photometry functionality compatible with stdpipe.photometry
but using IRAF/DAOPHOT tasks for aperture photometry measurements.
"""
import os
import tempfile
import shutil
import numpy as np
from astropy.io import fits
from astropy.table import Table
import photutils.background
# Check if pyraf is available without importing it yet
# (importing pyraf initializes IRAF which can cause issues with pytest stdin/stdout capturing)
def _check_pyraf_available():
"""Check if pyraf is available and can actually import iraf."""
try:
# First check if pyraf package exists
import importlib.util
spec = importlib.util.find_spec("pyraf")
if spec is None:
return False
# Now try to actually import iraf to verify it works
# This is necessary because pyraf might be installed but broken
from pyraf import iraf
return True
except (ImportError, ValueError, AttributeError):
return False
PYRAF_AVAILABLE = _check_pyraf_available()
# Global variable to hold iraf module once imported
_iraf = None
def _get_iraf():
"""Lazy import of pyraf.iraf module."""
global _iraf
if _iraf is None:
try:
from pyraf import iraf
_iraf = iraf
except ImportError as e:
raise ImportError(
"PyRAF is not available. Please install PyRAF to use this module."
) from e
return _iraf
def _select_psf_stars(obj, fwhm, n_stars=20, isolation_radius=None, max_stars=50, verbose=False):
"""
Select good PSF stars from object table.
Selects stars suitable for building a PSF model based on:
- Brightness (high flux or low magnitude)
- High S/N ratio (if flux_err available)
- Isolation (no nearby neighbors)
- Not saturated (if flags available)
- Roundness (if shape parameters available)
Parameters
----------
obj : astropy.table.Table
Object table with detected sources.
fwhm : float
FWHM in pixels for isolation checking.
n_stars : int
Target number of PSF stars to select.
isolation_radius : float or None
Isolation radius in pixels. Default is 10*fwhm.
max_stars : int
Maximum number of candidates to consider.
verbose : bool or callable
Verbose output.
Returns
-------
numpy.ndarray
Array of indices of selected PSF stars.
"""
log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None
if len(obj) == 0:
log("No objects available for PSF star selection")
return np.array([], dtype=int)
if isolation_radius is None:
isolation_radius = 10 * fwhm
# Start with all objects
candidates = np.ones(len(obj), dtype=bool)
# Filter out saturated objects if flags are available
if 'flags' in obj.colnames:
# Assume flag 0x04 indicates saturation (common convention)
saturated = (obj['flags'] & 0x04) != 0
candidates &= ~saturated
log(f"Removed {np.sum(saturated)} saturated objects")
# Prefer objects with good S/N if flux_err is available
if 'flux' in obj.colnames and 'flux_err' in obj.colnames:
sn = obj['flux'] / obj['flux_err']
# Require S/N > 20 for PSF stars
good_sn = sn > 20
candidates &= good_sn
log(f"Filtered to {np.sum(candidates)} objects with S/N > 20")
# Require positive flux
if 'flux' in obj.colnames:
candidates &= obj['flux'] > 0
if np.sum(candidates) == 0:
log("Warning: No suitable PSF star candidates found")
return np.array([], dtype=int)
# Sort by brightness (flux or magnitude)
if 'flux' in obj.colnames:
# Use flux - higher is brighter
brightness = obj['flux'].copy()
brightness[~candidates] = -np.inf
sorted_idx = np.argsort(brightness)[::-1] # Descending order
elif 'mag' in obj.colnames:
# Use magnitude - lower is brighter
brightness = obj['mag'].copy()
brightness[~candidates] = np.inf
sorted_idx = np.argsort(brightness) # Ascending order
else:
log("Warning: No flux or mag column available, cannot sort by brightness")
sorted_idx = np.arange(len(obj))
# Consider only the brightest candidates
sorted_idx = sorted_idx[:max_stars]
# Select isolated stars
selected = []
x_all = obj['x']
y_all = obj['y']
for idx in sorted_idx:
if not candidates[idx]:
continue
# Check isolation - no other bright stars nearby
x, y = x_all[idx], y_all[idx]
distances = np.sqrt((x_all - x) ** 2 + (y_all - y) ** 2)
# Count neighbors within isolation radius (excluding self)
neighbors = np.sum((distances < isolation_radius) & (distances > 0) & candidates)
if neighbors == 0:
selected.append(idx)
if len(selected) >= n_stars:
break
selected = np.array(selected, dtype=int)
log(f"Selected {len(selected)} isolated PSF stars from {np.sum(candidates)} candidates")
return selected
[docs]
def measure_objects(
obj,
image,
aper=3,
bkgann=None,
fwhm=None,
mask=None,
bg=None,
err=None,
gain=None,
bg_size=64,
sn=None,
centroid_iter=0,
keep_negative=True,
get_bg=False,
_workdir=None,
_tmpdir=None,
verbose=False,
):
"""Aperture photometry at the positions of already detected objects using PyRAF DAOPHOT.
This function provides similar behavior to :func:`stdpipe.photometry.measure_objects`
but uses IRAF DAOPHOT tasks for photometry measurements.
It will estimate and subtract the background unless external background estimation (`bg`)
is provided, and use user-provided noise map (`err`) if requested.
If the `mask` is provided, it will set 0x200 bit in object `flags` if at least one of
aperture pixels is masked.
The results may optionally be filtered to drop the detections with low signal to noise
ratio if `sn` parameter is set and positive. It will also filter out the events with
negative flux.
**Note:** Requires PyRAF and IRAF to be installed and properly configured.
Parameters
----------
obj : astropy.table.Table
Initial object detections to be measured. Must have 'x' and 'y' columns.
image : numpy.ndarray
Input image as a 2D NumPy array.
aper : float
Circular aperture radius in pixels for flux measurement.
bkgann : tuple of float or None
Background annulus as (inner_radius, outer_radius) for local background
estimation. If not set, global background model is used instead.
fwhm : float or None
If provided, `aper` and `bkgann` are measured in units of FWHM.
mask : numpy.ndarray or None
Boolean image mask (True values will be masked).
bg : numpy.ndarray or None
Background image to subtract instead of automatically computed one.
err : numpy.ndarray or None
Image noise map to use instead of automatically computed one.
gain : float or None
Image gain in e/ADU, used for photometry error estimation.
bg_size : int
Background grid size in pixels for global background estimation.
sn : float or None
Minimal S/N ratio. If set, measurements with magnitude errors
exceeding 1/sn will be discarded.
centroid_iter : int
Number of centroiding iterations before photometry (not implemented
for IRAF backend).
keep_negative : bool
If False, measurements with negative fluxes will be discarded.
get_bg : bool
If True, also return estimated background and background noise images.
_workdir : str or None
Directory for temporary files. If None, a temporary directory is created.
_tmpdir : str or None
Parent directory for temporary directory creation if _workdir is None.
verbose : bool or callable
Whether to show verbose messages. May be boolean or a print-like function.
Returns
-------
astropy.table.Table
Copy of original table with `flux`, `fluxerr`, `mag` and `magerr` columns
replaced with measured values. If ``get_bg=True``, returns a tuple of
(table, background, background_error).
Raises
------
ImportError
If PyRAF is not available.
RuntimeError
If DAOPHOT task fails.
"""
if not PYRAF_AVAILABLE:
raise ImportError("PyRAF is not available. Please install PyRAF to use this module.")
# Simple wrapper around print for logging in verbose mode only
log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None
if not len(obj):
log('No objects to measure')
return obj
# Check required columns
if 'x' not in obj.colnames or 'y' not in obj.colnames:
raise ValueError("Object table must have 'x' and 'y' columns")
# Operate on the copy of the list
obj = obj.copy()
# Sanitize the image and make its copy to safely operate on it
image1 = image.astype(np.double)
mask0 = ~np.isfinite(image1) # Minimal mask
# Ensure that the mask is defined
if mask is None:
mask = mask0
else:
mask = mask.astype(bool)
# Background estimation
if bg is None or err is None or get_bg:
log('Estimating global background with %dx%d mesh' % (bg_size, bg_size))
bg_est = photutils.background.Background2D(
image1, bg_size, mask=mask | mask0, exclude_percentile=90
)
bg_est_bg = bg_est.background
bg_est_rms = bg_est.background_rms
else:
bg_est = None
if bg is None:
log(
'Subtracting global background: median %.1f rms %.2f'
% (np.median(bg_est_bg), np.std(bg_est_bg))
)
image1 = image1 - bg_est_bg
bg_used = bg_est_bg
else:
log(
'Subtracting user-provided background: median %.1f rms %.2f'
% (np.median(bg), np.std(bg))
)
image1 = image1 - bg
bg_used = bg
image1[mask0] = 0
if err is None:
if bg_est is not None:
err = bg_est_rms
log(
'Using global background noise map: median %.1f rms %.2f'
% (np.median(err), np.std(err))
)
else:
# Estimate simple RMS if no background estimation was done
err = np.ones_like(image1) * np.nanstd(image1)
log('Using simple RMS estimate: %.2f' % np.nanstd(image1))
else:
log('Using user-provided noise map: median %.1f rms %.2f' % (np.median(err), np.std(err)))
# Scale apertures by FWHM if requested
if fwhm is not None and fwhm > 0:
log('Scaling aperture radii with FWHM %.1f pix' % fwhm)
aper_scaled = aper * fwhm
if bkgann is not None:
bkgann_scaled = [_ * fwhm for _ in bkgann]
else:
bkgann_scaled = None
else:
aper_scaled = aper
bkgann_scaled = bkgann
log('Using aperture radius %.1f pixels' % aper_scaled)
if centroid_iter > 0:
log('Warning: centroid_iter is not implemented for IRAF backend, ignoring')
# Create or use working directory
workdir = _workdir if _workdir is not None else tempfile.mkdtemp(prefix='iraf', dir=_tmpdir)
remove_workdir = _workdir is None # Only remove if we created it
try:
# Write image to temporary FITS file
image_file = os.path.join(workdir, 'image.fits')
fits.writeto(image_file, image1.astype(np.float32), overwrite=True)
# Write coordinate file
coord_file = os.path.join(workdir, 'coords.txt')
# IRAF uses 1-indexed coordinates
with open(coord_file, 'w') as f:
for i, row in enumerate(obj):
f.write('%.3f %.3f\n' % (row['x'] + 1, row['y'] + 1))
# Output file for photometry results
phot_file = os.path.join(workdir, 'phot.mag')
# Get iraf module (lazy import)
iraf = _get_iraf()
# Set up DAOPHOT parameters
iraf.noao(_doprint=0)
iraf.digiphot(_doprint=0)
iraf.daophot(_doprint=0)
# Configure photometry parameters
iraf.datapars.fwhmpsf = fwhm if fwhm is not None else 3.0
iraf.datapars.sigma = np.median(err) if err is not None else 1.0
iraf.datapars.datamin = 'INDEF'
iraf.datapars.datamax = 'INDEF'
iraf.datapars.gain = gain if gain is not None else 1.0
iraf.datapars.readnoise = 0.0 # Already included in error map
# Set aperture parameters
iraf.photpars.apertures = aper_scaled
iraf.photpars.zmag = 25.0 # Arbitrary zero point
# Set sky fitting parameters
if bkgann_scaled is not None:
log(
'Using local background annulus between %.1f and %.1f pixels'
% (bkgann_scaled[0], bkgann_scaled[1])
)
iraf.fitskypars.annulus = bkgann_scaled[0]
iraf.fitskypars.dannulus = bkgann_scaled[1] - bkgann_scaled[0]
iraf.fitskypars.salgorithm = 'mode'
else:
# Use minimal annulus if no local background requested
# This effectively uses already subtracted background
iraf.fitskypars.annulus = aper_scaled + 1
iraf.fitskypars.dannulus = 1
iraf.fitskypars.salgorithm = 'constant'
iraf.fitskypars.skyvalue = 0.0
# Run DAOPHOT phot task
log('Running IRAF DAOPHOT phot task')
iraf.phot(
image=image_file,
coords=coord_file,
output=phot_file,
interactive='no',
verify='no',
verbose='no',
)
# Parse DAOPHOT output
log('Parsing DAOPHOT output')
flux_list = []
fluxerr_list = []
mag_list = []
magerr_list = []
sky_list = []
# Read the output file using IRAF txdump
txdump_file = os.path.join(workdir, 'txdump.txt')
iraf.txdump(
textfiles=phot_file, fields='FLUX,MERR,MAG,MSKY', expr='yes', Stdout=txdump_file
)
# Parse txdump output
with open(txdump_file, 'r') as f:
for line in f:
parts = line.strip().split()
if len(parts) >= 4:
flux = float(parts[0]) if parts[0] != 'INDEF' else np.nan
magerr = float(parts[1]) if parts[1] != 'INDEF' else np.nan
mag = float(parts[2]) if parts[2] != 'INDEF' else np.nan
sky = float(parts[3]) if parts[3] != 'INDEF' else np.nan
flux_list.append(flux)
mag_list.append(mag)
magerr_list.append(magerr)
sky_list.append(sky)
# Calculate flux error from magnitude error
if np.isfinite(flux) and np.isfinite(magerr) and flux > 0:
fluxerr = flux * magerr * np.log(10) / 2.5
else:
fluxerr = np.nan
fluxerr_list.append(fluxerr)
# Verify we got results for all objects
if len(flux_list) != len(obj):
log('Warning: DAOPHOT returned %d results for %d objects' % (len(flux_list), len(obj)))
# Pad with NaNs if needed
while len(flux_list) < len(obj):
flux_list.append(np.nan)
fluxerr_list.append(np.nan)
mag_list.append(np.nan)
magerr_list.append(np.nan)
sky_list.append(np.nan)
# Store results in the object table
obj['flux'] = np.array(flux_list)
obj['fluxerr'] = np.array(fluxerr_list)
obj['mag'] = np.array(mag_list)
obj['magerr'] = np.array(magerr_list)
if 'flags' not in obj.keys():
obj['flags'] = 0
# Flag objects with masked pixels in aperture
# FIXME: This is a simplified check - just check if object is near masked pixels
for i, row in enumerate(obj):
x, y = int(row['x']), int(row['y'])
# Check pixels in aperture
y_grid, x_grid = np.ogrid[
max(0, y - int(aper_scaled) - 1) : min(image.shape[0], y + int(aper_scaled) + 2),
max(0, x - int(aper_scaled) - 1) : min(image.shape[1], x + int(aper_scaled) + 2),
]
r = np.sqrt((x_grid - x) ** 2 + (y_grid - y) ** 2)
if np.any(mask[y_grid, x_grid][r <= aper_scaled]):
obj['flags'][i] |= 0x200
# Flag objects with zero flux returned - these are truncated?..
obj['flags'][obj['flux'] == 0] &= 0x010 # SExtractor flag for truncated aperture
# Store local background if available
if bkgann_scaled is not None:
obj['bg_local'] = np.array(sky_list)
# Final filtering of properly measured objects
if sn is not None and sn > 0:
log('Filtering out measurements with S/N < %.1f' % sn)
idx = np.isfinite(obj['magerr'])
idx[idx] &= obj['magerr'][idx] < 1 / sn
obj = obj[idx]
if not keep_negative:
log('Filtering out measurements with negative fluxes')
idx = obj['flux'] > 0
obj = obj[idx]
finally:
# Clean up temporary directory only if we created it
if remove_workdir and os.path.exists(workdir):
shutil.rmtree(workdir)
if get_bg:
return obj, bg_used, err
else:
return obj
[docs]
def measure_objects_psf(
obj,
image,
fwhm=None,
psf_stars=None,
n_psf_stars=20,
mask=None,
bg=None,
err=None,
gain=None,
bg_size=64,
sn=None,
psfrad=None,
fitrad=None,
psf_function='auto',
psf_varorder=0,
keep_negative=True,
get_bg=False,
_workdir=None,
_tmpdir=None,
verbose=False,
):
"""PSF photometry using IRAF DAOPHOT (psf + allstar tasks).
This function performs PSF photometry using the full DAOPHOT workflow:
1. Select PSF stars (or use provided psf_stars)
2. Build PSF model using DAOPHOT psf task
3. Fit all sources using DAOPHOT allstar task
4. Return results in stdpipe format with quality metrics
The PSF model is built automatically from bright, isolated stars in the image.
If psf_stars is provided, those specific stars will be used instead of automatic
selection.
**Note:** Requires PyRAF and IRAF to be installed and properly configured.
Parameters
----------
obj : astropy.table.Table
Initial object detections. Must have 'x' and 'y' columns. Should also
have 'flux' or 'mag' for PSF star selection.
image : numpy.ndarray
Input image as a 2D NumPy array.
fwhm : float
FWHM in pixels (required for PSF building and fitting).
psf_stars : array-like or None
Indices of objects to use as PSF stars. If None, stars are selected
automatically.
n_psf_stars : int
Number of PSF stars to select automatically.
mask : numpy.ndarray or None
Boolean image mask (True values will be masked).
bg : numpy.ndarray or None
Background image to subtract instead of automatically computed one.
err : numpy.ndarray or None
Image noise map.
gain : float or None
Image gain in e/ADU.
bg_size : int
Background grid size in pixels.
sn : float or None
Minimal S/N ratio for output filtering.
psfrad : float or None
PSF radius in pixels (default: 3*fwhm). Sets how far the PSF extends.
fitrad : float or None
Fitting radius in pixels (default: fwhm). Sets how many pixels to use
for fitting each star.
psf_function : str
PSF function type: 'auto', 'gauss', 'moffat15', 'moffat25', 'lorentz',
'penny1', or 'penny2'.
psf_varorder : int
PSF variability order: 0=constant, 1=linear, 2=quadratic.
keep_negative : bool
If False, discard measurements with negative fluxes.
get_bg : bool
If True, also return background and error images.
_workdir : str or None
Working directory for temporary files.
_tmpdir : str or None
Parent directory for temporary directory creation.
verbose : bool or callable
Verbose output.
Returns
-------
astropy.table.Table
Table with PSF photometry results including flux, mag, x_psf, y_psf,
qfit_psf, cfit_psf, flags_psf columns. If ``get_bg=True``, returns a
tuple of (table, background, background_error).
Raises
------
ImportError
If PyRAF is not available.
ValueError
If required parameters are missing.
RuntimeError
If DAOPHOT tasks fail.
"""
if not PYRAF_AVAILABLE:
raise ImportError("PyRAF is not available. Please install PyRAF to use this module.")
# Simple wrapper around print for logging
log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None
if not len(obj):
log('No objects to measure')
return obj
# Check required parameters
if fwhm is None or fwhm <= 0:
raise ValueError("fwhm parameter is required and must be positive for PSF photometry")
# Check required columns
if 'x' not in obj.colnames or 'y' not in obj.colnames:
raise ValueError("Object table must have 'x' and 'y' columns")
# Operate on a copy
obj = obj.copy()
# Sanitize image
image1 = image.astype(np.double)
mask0 = ~np.isfinite(image1)
# Ensure mask is defined
if mask is None:
mask = mask0
else:
mask = mask.astype(bool)
# Background estimation
if bg is None or err is None or get_bg:
log('Estimating global background with %dx%d mesh' % (bg_size, bg_size))
bg_est = photutils.background.Background2D(
image1, bg_size, mask=mask | mask0, exclude_percentile=90
)
bg_est_bg = bg_est.background
bg_est_rms = bg_est.background_rms
else:
bg_est = None
if bg is None:
log(
'Subtracting global background: median %.1f rms %.2f'
% (np.median(bg_est_bg), np.std(bg_est_bg))
)
image1 = image1 - bg_est_bg
bg_used = bg_est_bg
else:
log(
'Subtracting user-provided background: median %.1f rms %.2f'
% (np.median(bg), np.std(bg))
)
image1 = image1 - bg
bg_used = bg
image1[mask0] = 0
if err is None:
if bg_est is not None:
err = bg_est_rms
log(
'Using global background noise map: median %.1f rms %.2f'
% (np.median(err), np.std(err))
)
else:
err = np.ones_like(image1) * np.nanstd(image1)
log('Using simple RMS estimate: %.2f' % np.nanstd(image1))
else:
log('Using user-provided noise map: median %.1f rms %.2f' % (np.median(err), np.std(err)))
# Set default PSF and fitting radii
if psfrad is None:
psfrad = 3 * fwhm
if fitrad is None:
fitrad = fwhm
log(f'Using PSF radius {psfrad:.1f} pixels, fitting radius {fitrad:.1f} pixels')
# Select PSF stars if not provided
if psf_stars is None:
# Adjust requested number if we have fewer objects
n_stars_to_select = min(n_psf_stars, max(2, len(obj) // 2))
log(f'Automatically selecting up to {n_stars_to_select} PSF stars from {len(obj)} objects')
psf_star_idx = _select_psf_stars(obj, fwhm, n_stars=n_stars_to_select, verbose=verbose)
if len(psf_star_idx) == 0:
raise RuntimeError("No suitable PSF stars found. Cannot build PSF model.")
log(f'Selected {len(psf_star_idx)} PSF stars')
else:
psf_star_idx = np.asarray(psf_stars, dtype=int)
log(f'Using {len(psf_star_idx)} user-provided PSF stars')
# Create working directory
workdir = _workdir if _workdir is not None else tempfile.mkdtemp(prefix='iraf_psf', dir=_tmpdir)
remove_workdir = _workdir is None
try:
# Write background-subtracted image
image_file = os.path.join(workdir, 'image.fits')
fits.writeto(image_file, image1.astype(np.float32), overwrite=True)
# Write coordinate file for all objects (1-indexed for IRAF)
coord_file = os.path.join(workdir, 'coords.txt')
with open(coord_file, 'w') as f:
for i, row in enumerate(obj):
f.write('%.3f %.3f\n' % (row['x'] + 1, row['y'] + 1))
# Get iraf module
iraf = _get_iraf()
# Set up DAOPHOT
iraf.noao(_doprint=0)
iraf.digiphot(_doprint=0)
iraf.daophot(_doprint=0)
# Configure data parameters
iraf.datapars.fwhmpsf = fwhm
iraf.datapars.sigma = np.median(err) if err is not None else 1.0
iraf.datapars.datamin = 'INDEF'
iraf.datapars.datamax = 'INDEF'
iraf.datapars.epadu = gain if gain is not None else 1.0
iraf.datapars.readnoise = 0.0
# Configure photometry parameters (needed for initial phot)
iraf.photpars.apertures = fitrad
iraf.photpars.zmag = 25.0
# Configure sky parameters (use constant zero - already subtracted)
iraf.fitskypars.annulus = psfrad + 1.0
iraf.fitskypars.dannulus = 1.0
iraf.fitskypars.salgorithm = 'constant'
iraf.fitskypars.skyvalue = 0.0
# Step 1: Run phot on all stars to get initial magnitudes
log('Running DAOPHOT phot task on all objects')
phot_file = os.path.join(workdir, 'all_stars.mag')
iraf.phot(
image=image_file,
coords=coord_file,
output=phot_file,
interactive='no',
verify='no',
verbose='no',
)
# Step 2: Create PSF star list file using pselect
log(f'Creating PSF star list with {len(psf_star_idx)} stars')
pst_file = os.path.join(workdir, 'psf_stars.pst')
# Build expression to select PSF stars by ID (1-indexed in IRAF)
# IDs in DAOPHOT are 1-indexed, so we add 1 to our 0-indexed array
psf_star_ids = [str(idx + 1) for idx in psf_star_idx]
expr = ' || '.join([f'ID=={id}' for id in psf_star_ids])
log(f'Selecting PSF stars with IDs: {", ".join(psf_star_ids)}')
# Use pselect to extract PSF stars from phot output
iraf.pselect(infiles=phot_file, outfiles=pst_file, expr=expr)
# Step 3: Build PSF model using psf task
log(f'Building PSF model with function={psf_function}, varorder={psf_varorder}')
psf_image = os.path.join(workdir, 'psf') # IRAF will add .fits extension
pst_out_file = os.path.join(workdir, 'psf_stars_used.pst')
grp_file = os.path.join(workdir, 'groups.grp')
# Configure PSF parameters
iraf.daopars.psfrad = psfrad
iraf.daopars.fitrad = fitrad
iraf.daopars.function = psf_function
iraf.daopars.varorder = psf_varorder
iraf.psf(
image=image_file,
photfile=phot_file,
pstfile=pst_file,
psfimage=psf_image,
opstfile=pst_out_file,
groupfile=grp_file,
interactive='no',
verify='no',
verbose='no',
)
log('PSF model built successfully')
# Step 4: Run allstar to fit PSF to all objects
log('Running DAOPHOT allstar task')
allstar_file = os.path.join(workdir, 'output.als')
reject_file = os.path.join(workdir, 'rejected.als')
subimage_file = os.path.join(workdir, 'subtracted.fits')
# Configure allstar parameters
iraf.daopars.recenter = 'yes'
iraf.daopars.fitsky = 'no' # Sky already subtracted
iraf.allstar(
image=image_file,
photfile=phot_file,
psfimage=psf_image,
allstarfile=allstar_file,
rejfile=reject_file,
subimage=subimage_file,
verify='no',
verbose='no',
)
log('PSF fitting completed, parsing results')
# Step 5: Parse allstar output
# .als format: ID X Y MAG MERR MSKY NITER SHARPNESS CHI PIER
txdump_als_file = os.path.join(workdir, 'txdump_als.txt')
iraf.txdump(
textfiles=allstar_file,
fields='ID,XCEN,YCEN,MAG,MERR,SHARPNESS,CHI,PIER',
expr='yes',
Stdout=txdump_als_file,
)
# Store results in object table
obj['flux'] = np.nan
obj['fluxerr'] = np.nan
obj['mag'] = np.nan
obj['magerr'] = np.nan
obj['x_psf'] = np.nan
obj['y_psf'] = np.nan
obj['qfit_psf'] = np.nan
obj['cfit_psf'] = np.nan
obj['flags_psf'] = 0
# Parse txdump output
with open(txdump_als_file, 'r') as f:
txdump_lines = f.readlines()
log(f'Allstar returned {len(txdump_lines)} fitted objects')
for line in txdump_lines:
parts = line.strip().split()
if len(parts) >= 8:
obj_id = int(parts[0])
x_fit = (
float(parts[1]) - 1 if parts[1] != 'INDEF' else np.nan
) # Convert to 0-indexed
y_fit = float(parts[2]) - 1 if parts[2] != 'INDEF' else np.nan
mag = float(parts[3]) if parts[3] != 'INDEF' else np.nan
magerr = float(parts[4]) if parts[4] != 'INDEF' else np.nan
sharpness = float(parts[5]) if parts[5] != 'INDEF' else np.nan
chi = float(parts[6]) if parts[6] != 'INDEF' else np.nan
pier = int(parts[7]) if parts[7] != 'INDEF' else 0
# Convert mag to flux (using ZP=25.0)
if np.isfinite(mag):
flux = 10 ** ((25.0 - mag) / 2.5)
if np.isfinite(magerr) and magerr > 0:
fluxerr = flux * magerr * np.log(10) / 2.5
else:
fluxerr = np.nan
else:
flux = np.nan
fluxerr = np.nan
obj['flux'][obj_id - 1] = flux
obj['fluxerr'][obj_id - 1] = fluxerr
obj['mag'][obj_id - 1] = mag
obj['magerr'][obj_id - 1] = magerr
obj['x_psf'][obj_id - 1] = x_fit
obj['y_psf'][obj_id - 1] = y_fit
obj['qfit_psf'][obj_id - 1] = chi
obj['cfit_psf'][obj_id - 1] = sharpness
obj['flags_psf'][obj_id - 1] = pier
# Initialize flags column if not present
if 'flags' not in obj.colnames:
obj['flags'] = 0
# Flag objects with non-zero pier flags
obj['flags'][obj['flags_psf'] != 0] |= 0x100
obj['flags'][~np.isfinite(obj['flux'])] |= 0x100
# Apply filters
if sn is not None and sn > 0:
log(f'Filtering out measurements with S/N < {sn:.1f}')
idx = np.isfinite(obj['magerr'])
idx[idx] &= obj['magerr'][idx] < 1 / sn
obj = obj[idx]
if not keep_negative:
log('Filtering out measurements with negative fluxes')
idx = obj['flux'] > 0
obj = obj[idx]
finally:
# Clean up
if remove_workdir and os.path.exists(workdir):
shutil.rmtree(workdir)
if get_bg:
return obj, bg_used, err
else:
return obj