Source code for stdpipe.psf

"""
Module for working with point-spread function (PSF) models
"""

import os, shutil, tempfile, shlex
import numpy as np

from astropy.io import fits
from astropy.table import Table
from astropy.nddata import NDData

from scipy import ndimage

import photutils.psf

from . import photometry
from . import utils


[docs] def run_psfex( image, mask=None, thresh=2.0, aper=None, r0=0.0, gain=1, minarea=5, vignet_size=None, psf_size=None, order=0, sex_extra=None, checkimages=None, extra=None, psffile=None, get_obj=False, _workdir=None, _tmpdir=None, _exe=None, _sex_exe=None, verbose=False, ): """Wrapper around PSFEx to help extracting PSF models from images. For the details of PSFEx operation we suggest to consult its documentation at https://psfex.readthedocs.io Parameters ---------- image : numpy.ndarray Input image as a NumPy array. mask : numpy.ndarray, optional Image mask as a boolean array (True values will be masked). thresh : float, optional Detection threshold in sigmas above local background, for running initial SExtractor object detection. aper : float, optional Circular aperture radius in pixels, to be used for PSF normalization. Should contain most of object flux. If not specified, will be estimated as twice the FWHM. r0 : float, optional Smoothing kernel size (sigma) to be used for improving object detection in initial SExtractor call. gain : float, optional Image gain. minarea : int, optional Minimal number of pixels in the object to be considered a detection (``DETECT_MINAREA`` parameter of SExtractor). vignet_size : int, optional The size of *postage stamps* to be used for PSF model creation. psf_size : int, optional The size of the supersampled PSF model. order : int, optional Spatial order of PSF model variance. sex_extra : dict, optional Dictionary of additional options to be passed to SExtractor for initial object detection (``extra`` parameter of :func:`stdpipe.photometry.get_objects_sextractor`). checkimages : list, optional List of PSFEx checkimages to return along with PSF model. extra : dict, optional Dictionary of extra configuration parameters to be passed to PSFEx call, with keys as parameter names. See :code:`psfex -dd` for the full list. psffile : str, optional If specified, PSF model file will also be stored under this file name, so that it may e.g. be re-used by SExtractor later. get_obj : bool, optional If set, also return the table with SExtractor detected objects. _workdir : str, optional If specified, all temporary files will be created in this directory, and will be kept intact after running SExtractor and PSFEx. May be used for debugging exact inputs and outputs of the executable. _tmpdir : str, optional If specified, all temporary files will be created in a dedicated directory (that will be deleted after running the executable) inside this path. _exe : str, optional Full path to PSFEx executable. If not provided, the code tries to locate it automatically in your :envvar:`PATH`. _sex_exe : str, optional Full path to SExtractor executable. If not provided, the code tries to locate it automatically in your :envvar:`PATH`. verbose : bool or callable, optional Whether to show verbose messages during the run of the function or not. Returns ------- dict PSF structure corresponding to the built PSFEx model. The structure has at least the following fields: - ``width``, ``height`` - dimensions of supersampled PSF stamp - ``fwhm`` - mean full width at half maximum (FWHM) of the images used for building the PSF model - ``sampling`` - conversion factor between PSF stamp (supersampled) pixel size, and original image one (less than unity when supersampled resolution is finer than original image one) - ``ncoeffs`` - number of coefficients pixel polynomials have - ``degree`` - polynomial degree of a spatial variance of PSF model - ``data`` - the data containing per-pixel polynomial coefficients for PSF model - ``header`` - original FITS header of PSF model file, if :code:`get_header=True` parameter was set This structure corresponds to the contents of original PSFEx generated output file that is documented at https://psfex.readthedocs.io/en/latest/Appendices.html#psf-file-format-description """ # Simple wrapper around print for logging in verbose mode only log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None # Find the binary binname = None if _exe is not None: # Check user-provided binary path, and fail if not found if os.path.isfile(_exe): binname = _exe else: # Find PSFEx binary in common paths for exe in ['psfex']: binname = shutil.which(exe) if binname is not None: break if binname is None: log("Can't find PSFEx binary") return None # else: # log("Using PSFEx binary at", binname) workdir = _workdir if _workdir is not None else tempfile.mkdtemp(prefix='psfex', dir=_tmpdir) psf = None # Estimate image FWHM if aperture radius is not set if sex_extra is None: sex_extra = {} if checkimages is None: checkimages = [] if extra is None: extra = {} if not aper: log('Aperture size not specified, will estimate it from image FWHM') obj = photometry.get_objects_sextractor( image, mask=mask, thresh=thresh, aper=3.0, r0=r0, gain=gain, minarea=minarea, _workdir=workdir, _tmpdir=_tmpdir, _exe=_sex_exe, verbose=verbose, extra=sex_extra, ) fwhm_vals = obj['fwhm'][obj['flags'] == 0] if len(fwhm_vals) > 0: fwhm = np.median(fwhm_vals) else: fwhm = np.nan if not np.isfinite(fwhm) or fwhm <= 0: fwhm = 3.0 log('FWHM estimate failed, using default %.1f pixels' % fwhm) aper = 2.0 * fwhm log('FWHM = %.1f pixels, will use aperture radius %.1f pixels' % (fwhm, aper)) if vignet_size is None: vignet_size = int(np.ceil(6 * aper)) + 1 else: vignet_size = int(np.round(vignet_size)) if vignet_size % 2 == 0: vignet_size += 1 log('Extracting PSF using vignette size %d x %d pixels' % (vignet_size, vignet_size)) # Run SExtractor on input image in current workdir so that the LDAC catalogue will be in out.cat there obj = photometry.get_objects_sextractor( image, mask=mask, thresh=thresh, aper=aper, r0=r0, gain=gain, minarea=minarea, _workdir=workdir, _tmpdir=_tmpdir, _exe=_sex_exe, verbose=verbose, extra_params=[ 'SNR_WIN', 'ELONGATION', 'VIGNET(%d,%d)' % (vignet_size, vignet_size), ], extra=sex_extra, ) catname = os.path.join(workdir, 'out.cat') psfname = os.path.join(workdir, 'out.psf') # Dummy config filename, to prevent loading from current dir confname = os.path.join(workdir, 'empty.conf') utils.file_write(confname) opts = { 'c': confname, 'VERBOSE_TYPE': 'QUIET', 'CHECKPLOT_TYPE': 'NONE', 'CHECKIMAGE_TYPE': 'NONE', 'PSFVAR_DEGREES': order, 'WRITE_XML': 'N', } checknames = [os.path.join(workdir, _.replace('-', 'M_') + '.fits') for _ in checkimages] if checkimages: opts['CHECKIMAGE_TYPE'] = ','.join(checkimages) opts['CHECKIMAGE_NAME'] = ','.join(checknames) opts.update(extra) if psf_size is not None: opts['PSF_SIZE'] = [psf_size, psf_size] # Build the command line cmd = binname + ' ' + shlex.quote(catname) + ' ' + utils.format_astromatic_opts(opts) if not verbose: cmd += ' > /dev/null 2>/dev/null' log('Will run PSFEx like that:') log(cmd) # Run the command! res = os.system(cmd) if res == 0 and os.path.exists(psfname): log('PSFEx run succeeded') psf = load_psf(psfname, verbose=verbose) # Check whether the PSF model extends beyond the vignet coverage. # PSFEx uses Lanczos interpolation with renormalization at boundaries # when resampling vignets into the PSF grid. When the PSF model is # larger than the vignet (in image pixels), the boundary pixels get # spurious non-zero values from the renormalized partial kernel, # which biases the PSF wings and corrupts flux measurements. psf_extent = psf['width'] * psf['sampling'] if psf_extent > vignet_size: import warnings warnings.warn( "PSF model extent (%.0f x %.0f pixels = psf_size %d x sampling %.3f) " "exceeds vignet size (%d pixels). This causes interpolation artifacts " "in the PSF wings that bias flux measurements. " "Either increase vignet_size to >= %.0f or decrease psf_size to <= %d." % ( psf_extent, psf_extent, psf['width'], psf['sampling'], vignet_size, psf_extent, int(vignet_size / psf['sampling']), ) ) if psffile is not None: shutil.copyfile(psfname, psffile) log("PSF model stored to", psffile) else: log("Error", res, "running PSFEx") result = psf if checkimages: result = [result] for name in checknames: checkname = os.path.splitext(name)[0] + '_out.fits' result.append(fits.getdata(checkname)) if get_obj: if type(result) != list: result = [result] result.append(obj) if _workdir is None: shutil.rmtree(workdir) return result
[docs] def load_psf(filename, get_header=False, verbose=False): """Load PSF model from PSFEx file The structure may be useful for inspection of PSF model with :func:`stdpipe.psf.get_supersampled_psf_stamp` and :func:`stdpipe.psf.get_psf_stamp`, as well as for injection of PSF instances (fake objects) into the image with :func:`stdpipe.psf.place_psf_stamp`. Parameters ---------- filename : str Name of a file containing PSF model built by PSFEx. get_header : bool, optional Whether to return the original FITS header of PSF model file or not. If set, the header will be stored in the ``header`` field of the returned structure. verbose : bool or callable, optional Whether to show verbose messages during the run of the function or not. Returns ------- dict PSF structure in the same format as returned from :func:`stdpipe.psf.run_psfex`. """ # Simple wrapper around print for logging in verbose mode only log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None log('Loading PSF model from %s' % filename) data = fits.getdata(filename, 1) header = fits.getheader(filename, 1) psf = { 'width': header.get('PSFAXIS1'), 'height': header.get('PSFAXIS2'), 'ncoeffs': header.get('PSFAXIS3'), 'fwhm': header.get('PSF_FWHM'), 'sampling': header.get('PSF_SAMP'), 'degree': header.get('POLDEG1', 0), 'x0': header.get('POLZERO1', 0), 'sx': header.get('POLSCAL1', 1), 'y0': header.get('POLZERO2', 0), 'sy': header.get('POLSCAL2', 1), } if get_header: psf['header'] = header psf['data'] = data[0][0] log( 'PSF model %d x %d pixels, FWHM %.1f pixels, sampling %.2f, degree %d' % (psf['width'], psf['height'], psf['fwhm'], psf['sampling'], psf['degree']) ) return psf
[docs] def bilinear_interpolate(im, x, y): """ Quick and dirty bilinear interpolation """ x = np.asarray(x) y = np.asarray(y) x0 = np.floor(x).astype(int) x1 = x0 + 1 y0 = np.floor(y).astype(int) y1 = y0 + 1 x0 = np.clip(x0, 0, im.shape[1] - 1) x1 = np.clip(x1, 0, im.shape[1] - 1) y0 = np.clip(y0, 0, im.shape[0] - 1) y1 = np.clip(y1, 0, im.shape[0] - 1) Ia = im[y0, x0] Ib = im[y1, x0] Ic = im[y0, x1] Id = im[y1, x1] wa = (x1 - x) * (y1 - y) wb = (x1 - x) * (y - y0) wc = (x - x0) * (y1 - y) wd = (x - x0) * (y - y0) return wa * Ia + wb * Ib + wc * Ic + wd * Id
[docs] def get_supersampled_psf_stamp(psf, x=0, y=0, normalize=True): """Returns supersampled PSF model for a given position inside the image. The returned stamp corresponds to PSF model evaluated at a given position inside the image, with its center always in the center of central stamp pixel. Every *supersampled* pixel of the stamp corresponds to :code:`psf['sampling']` pixels of the original image. Parameters ---------- psf : dict Input PSF structure as returned by :func:`stdpipe.psf.run_psfex` or :func:`stdpipe.psf.load_psf`. x : float, optional ``x`` coordinate of the position inside the original image to evaluate the PSF model. y : float, optional ``y`` coordinate of the position inside the original image to evaluate the PSF model. normalize : bool, optional Whether to normalize the stamp to have flux exactly equal to unity or not. Returns ------- numpy.ndarray Stamp of the PSF model evaluated at the given position inside the image. """ dx = 1.0 * (x - psf['x0']) / psf['sx'] dy = 1.0 * (y - psf['y0']) / psf['sy'] stamp = np.zeros(psf['data'].shape[1:], dtype=np.double) i = 0 for i2 in range(0, psf['degree'] + 1): for i1 in range(0, psf['degree'] + 1 - i2): stamp += psf['data'][i] * dx**i1 * dy**i2 i += 1 if normalize: total = np.sum(stamp) if np.isfinite(total) and total > 0: stamp /= total return stamp
[docs] def get_psf_stamp(psf, x=0, y=0, dx=None, dy=None, normalize=True): """Returns PSF stamp in original image pixel space with sub-pixel shift applied. The PSF model is evaluated at the requested position inside the original image, and then downscaled from supersampled pixels of the PSF model to original image pixels, and then adjusted to accommodate for requested :code:`(dx, dy)` sub-pixel shift. Stamp is odd-sized, with PSF center at:: x0 = floor(width/2) + dx y0 = floor(height/2) + dy If :code:`dx=None` or :code:`dy=None`, they are computed directly from the floating point parts of the position `x` and `y` arguments:: dx = x - round(x) dy = y - round(y) The stamp should directly represent stellar shape at a given position (including sub-pixel center shift) inside the image. Parameters ---------- psf : dict Input PSF structure as returned by :func:`stdpipe.psf.run_psfex` or :func:`stdpipe.psf.load_psf`. x : float, optional ``x`` coordinate of the position inside the original image to evaluate the PSF model. y : float, optional ``y`` coordinate of the position inside the original image to evaluate the PSF model. dx : float, optional Sub-pixel adjustment of PSF position in image space, ``x`` direction. dy : float, optional Sub-pixel adjustment of PSF position in image space, ``y`` direction. normalize : bool, optional Whether to normalize the stamp to have flux exactly equal to unity or not. Returns ------- numpy.ndarray Stamp of the PSF model evaluated at the given position inside the image, in original image pixels. """ if dx is None: dx = x - np.round(x) if dy is None: dy = y - np.round(y) supersampled = get_supersampled_psf_stamp(psf, x, y, normalize=normalize) # Oversampling factor N = int(round(1.0 / psf['sampling'])) if N > 1 and supersampled.shape[0] % N == 0 and supersampled.shape[1] % N == 0: # Flux-conserving downsampling: shift at oversampled resolution, then sum N×N blocks. # The oversampled data stores pixel-integrated flux per subpixel, so the correct # way to get image-pixel flux is to sum the subpixels within each image pixel. # Apply sub-pixel shift at oversampled resolution via cubic interpolation. # At the oversampled resolution the PSF is well-sampled, so cubic interpolation # accurately reconstructs the continuous PSF for sub-pixel shifts. shift_x_os = dx / psf['sampling'] shift_y_os = dy / psf['sampling'] if shift_x_os != 0 or shift_y_os != 0: shifted = ndimage.shift( supersampled, [shift_y_os, shift_x_os], order=3, mode='constant', cval=0 ) else: shifted = supersampled # Sum N×N blocks out_h = supersampled.shape[0] // N out_w = supersampled.shape[1] // N stamp = shifted[: out_h * N, : out_w * N].reshape(out_h, N, out_w, N).sum(axis=(1, 3)) else: # Fallback for non-integer oversampling or oversampling=1 ssx0 = (supersampled.shape[1] - 1) / 2.0 ssy0 = (supersampled.shape[0] - 1) / 2.0 x0 = np.floor(psf['width'] * psf['sampling'] / 2) y0 = np.floor(psf['height'] * psf['sampling'] / 2) width = int(x0) * 2 + 1 height = int(y0) * 2 + 1 x0 += dx y0 += dy y, x = np.mgrid[0:height, 0:width] x1 = ssx0 + (x - x0) / psf['sampling'] y1 = ssy0 + (y - y0) / psf['sampling'] stamp = ndimage.map_coordinates(supersampled, [y1, x1], order=3) / psf['sampling'] ** 2 if normalize: total = np.sum(stamp) if np.isfinite(total) and total > 0: stamp /= total return stamp
[docs] def place_psf_stamp(image, psf, x0, y0, flux=1, gain=None): """Places PSF stamp, scaled to a given flux, at a given position inside the image. PSF stamp is evaluated at a given position, then adjusted to accommodate for required sub-pixel shift, and finally scaled to requested flux value. Thus, the routine corresponds to injection of an artificial point source into the image. The stamp values are added on top of current content of the image. If `gain` value is set, the Poissonian noise is applied to the stamp. The image is modified in-place. Parameters ---------- image : numpy.ndarray The image where the artificial source will be injected (modified in-place). psf : dict Input PSF structure as returned by :func:`stdpipe.psf.run_psfex` or :func:`stdpipe.psf.load_psf`. x0 : float ``x`` coordinate of the position to inject the source. y0 : float ``y`` coordinate of the position to inject the source. flux : float, optional The source flux in ADU units. gain : float, optional Image gain value. If set, used to apply Poissonian noise to the source. """ stamp = get_psf_stamp(psf, x0, y0, normalize=True) stamp *= flux if gain is not None: idx = stamp > 0 # FIXME: what to do with negative points?.. stamp[idx] = np.random.poisson(stamp[idx] * gain) / gain # Integer coordinates inside the stamp y, x = np.mgrid[0 : stamp.shape[0], 0 : stamp.shape[1]] # Corresponding image pixels y1, x1 = np.mgrid[0 : stamp.shape[0], 0 : stamp.shape[1]] x1 += int(np.round(x0) - np.floor(stamp.shape[1] / 2)) y1 += int(np.round(y0) - np.floor(stamp.shape[0] / 2)) # Crop the coordinates outside target image idx = np.isfinite(stamp) idx &= (x1 >= 0) & (x1 < image.shape[1]) idx &= (y1 >= 0) & (y1 < image.shape[0]) # Add the stamp to the image! image[y1[idx], x1[idx]] += stamp[y[idx], x[idx]]
[docs] def create_psf_model( image, obj=None, fwhm=None, size=None, mask=None, oversampling=None, degree=0, regularization=1e-6, subtract_neighbors=True, subtract_background=False, isolation=2.0, get_raw=False, verbose=False, ): """ Create an empirical PSF (ePSF) model from stars in the image. For ``degree=0`` (default), builds a position-invariant ePSF using photutils ``EPSFBuilder`` (iterative recentering and stacking). For ``degree > 0``, builds a position-dependent PSF model by fitting per-pixel polynomial coefficients to resampled star stamps, following the same approach as PSFEx. The polynomial model is: .. math:: PSF(i,j; x,y) = \\sum_k c_k(i,j) \\cdot dx^{p1_k} \\cdot dy^{p2_k} where ``(dx, dy)`` are normalized image coordinates and ``(p1_k, p2_k)`` are polynomial exponents with ``p1_k + p2_k <= degree``. The returned dictionary structure is compatible with PSFEx output from :func:`stdpipe.psf.run_psfex` and can be used with the same evaluation functions like :func:`stdpipe.psf.get_psf_stamp`. Parameters ---------- image : numpy.ndarray Input image as a NumPy array, must be background subtracted. obj : astropy.table.Table, optional Table of star positions. If None, stars will be detected automatically. Should have 'x', 'y' columns and optionally 'flux'. fwhm : float, optional Approximate FWHM of stars in pixels. If None, will be estimated. size : int, optional Size of cutouts to extract around stars (should be odd). If None, automatically determined from FWHM as ``max(15, round_up_to_odd(5 * fwhm))``. mask : numpy.ndarray, optional Image mask as a boolean array (True values will be masked). oversampling : int, optional Oversampling factor for the ePSF. If None (default), it is auto-selected from the FWHM: ``1`` when ``fwhm >= 2.5`` image pixels (well-sampled PSF) and ``2`` otherwise (under-sampled PSF). Pass an explicit integer to override. degree : int, optional Polynomial degree for spatial PSF variation (default: 0 = constant). Degree 1 = linear (3 coefficients), degree 2 = quadratic (6 coefficients), etc. regularization : float, optional Tikhonov regularization parameter for polynomial fitting (default: 1e-6). Only used when ``degree > 0``. Set to 0 for unregularized least-squares. subtract_neighbors : bool, optional If True (default), subtract estimated flux from neighboring stars before extracting cutouts. Reduces contamination in crowded fields. Only used when ``degree > 0``. subtract_background : bool or {'none', 'median', 'plane'}, optional Local background handling for each training stamp before normalization. ``False`` or ``'none'`` leaves the current image values unchanged, ``True`` or ``'median'`` subtracts the median of the stamp border, and ``'plane'`` fits and subtracts a tilted background plane from the border pixels. Only used when ``degree > 0``. When building from the raw image instead of a background-subtracted one, ``'plane'`` is usually the most robust choice. isolation : float, optional Minimum nearest-neighbor distance in FWHM units for selecting stars for ePSF building (default: 2.0). Stars with a neighbor closer than ``isolation * fwhm`` are excluded. Combined with ``subtract_neighbors=True``, the lower default value works in both sparse and dense fields. Set to 0 or None to disable isolation filtering. get_raw : bool, optional If True and ``degree=0``, returns raw photutils EPSFModel object. Ignored when ``degree > 0``. verbose : bool or callable, optional Whether to show verbose messages. Returns ------- dict Dictionary with PSFEx-compatible structure containing the PSF model. """ log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None # Detect stars if not provided if obj is None: log('Detecting stars for ePSF building') obj = photometry.get_objects_sep(image, mask=mask, thresh=5.0, aper=3.0, verbose=verbose) # Select isolated, bright, non-saturated stars # Simple selection: median flux and not too crowded if len(obj) == 0: raise ValueError("No stars detected for ePSF building") # Determine FWHM and stamp size early so we can use them for edge filtering if fwhm is None: if 'fwhm' in obj.colnames: fwhm = np.median(obj['fwhm']) log('Using median FWHM: %.2f pixels' % fwhm) else: fwhm = 3.0 log('FWHM not available, using default: %.2f pixels' % fwhm) if size is None: size = max(15, int(np.ceil(5 * fwhm))) if size % 2 == 0: size += 1 flux_median = np.median(obj['flux']) flux_std = np.std(obj['flux']) # Select stars with flux within reasonable range idx = (obj['flux'] > flux_median) & (obj['flux'] < flux_median + 3 * flux_std) # Remove edge objects edge = size idx &= (obj['x'] > edge) & (obj['x'] < image.shape[1] - edge) idx &= (obj['y'] > edge) & (obj['y'] < image.shape[0] - edge) # Remove flagged objects if 'flags' in obj.colnames: idx &= obj['flags'] == 0 obj = obj[idx] log('Selected %d stars for ePSF building' % len(obj)) if fwhm is None: if 'fwhm' in obj.colnames: fwhm = np.median(obj['fwhm']) log('Using median FWHM: %.2f pixels' % fwhm) else: fwhm = 3.0 log('FWHM not available, using default: %.2f pixels' % fwhm) # Auto-size stamps based on FWHM if not specified. # 5*FWHM keeps the stamp small enough to be cheap in sep.psf_fit while # still covering ~3-sigma of the PSF wings. The 8*FWHM legacy default # over-extends in dense fields (large fit groups, slow rendering). if size is None: size = max(15, int(np.ceil(5 * fwhm))) if size % 2 == 0: size += 1 # Make sure size is odd log('Using stamp size: %d pixels (FWHM=%.1f)' % (size, fwhm)) # Auto-pick oversampling from FWHM if not explicitly set. # FWHM >= 2.5 image pixels is well-sampled enough that oversampling=1 is # adequate (saves model storage and SEP rendering cost). Smaller FWHM # (under-sampled PSFs) needs oversampling=2 to avoid pixelisation bias. if oversampling is None: oversampling = 1 if fwhm >= 2.5 else 2 log('Auto-selected oversampling=%d (FWHM=%.2f pix)' % (oversampling, fwhm)) # Filter by isolation: reject stars with a neighbor closer than isolation * fwhm. # Neighbor contamination in star stamps is the primary source of ePSF bias # in crowded fields; selecting only isolated stars dramatically improves # ePSF quality (tested: reduces bias from +30% to -1% at 6 FWHM separation). if isolation and isolation > 0 and len(obj) > 1: from scipy.spatial import cKDTree min_dist = isolation * fwhm tree = cKDTree(np.c_[obj['x'], obj['y']]) nn_dist = tree.query(np.c_[obj['x'], obj['y']], k=2)[0][:, 1] isolated = obj[nn_dist > min_dist] n_before = len(obj) if len(isolated) >= 10: obj = isolated log( 'Isolation filter (>%.0f*FWHM = >%.1f px): %d / %d stars selected' % (isolation, min_dist, len(obj), n_before) ) else: # Not enough isolated stars; fall back to the most isolated ones n_fallback = max(10, n_before // 5) idx = np.argsort(-nn_dist)[:n_fallback] obj = obj[idx] log( 'Isolation filter: only %d stars with >%.1f px separation; ' 'using %d most isolated instead' % (len(isolated), min_dist, len(obj)) ) background_mode = _normalize_stamp_background_mode(subtract_background) if degree == 0: if background_mode != 'none': log('Ignoring local stamp background mode for position-invariant ePSF builder') return _create_psf_model_constant( image, obj, fwhm, size, mask, oversampling, get_raw, verbose, log ) else: return _create_psf_model_polynomial( image, obj, fwhm, size, mask, oversampling, degree, regularization, subtract_neighbors, background_mode, log, )
def _create_psf_model_constant(image, obj, fwhm, size, mask, oversampling, get_raw, verbose, log): """Build position-invariant ePSF using photutils EPSFBuilder.""" log('Extracting %dx%d cutouts around %d stars' % (size, size, len(obj))) nddata = NDData(data=image, mask=mask) # Extract stars using photutils stars = photutils.psf.extract_stars(nddata, Table({'x': obj['x'], 'y': obj['y']}), size=size) # Build ePSF log('Building ePSF model with oversampling=%d' % oversampling) epsf_builder = photutils.psf.EPSFBuilder( oversampling=oversampling, maxiters=10, progress_bar=bool(verbose) ) epsf, fitted_stars = epsf_builder(stars) log('ePSF building complete. PSF shape: %s' % str(epsf.data.shape)) if get_raw: return epsf # Build PSFEx-compatible structure psf_data = epsf.data # Reshape to (ncoeffs, height, width) format # ePSF is position-invariant, so ncoeffs=1 if psf_data.ndim == 2: psf_data = psf_data[np.newaxis, :, :] # Add coefficient dimension psf = { 'width': psf_data.shape[2], 'height': psf_data.shape[1], 'fwhm': fwhm, 'sampling': 1.0 / oversampling, # PSFEx convention: < 1 means supersampled 'ncoeffs': 1, 'degree': 0, # Constant PSF (no position dependence) 'x0': 0, 'y0': 0, 'sx': 1, 'sy': 1, 'data': psf_data, 'oversampling': oversampling, # Keep for reference 'type': 'epsf', # Identify PSF type } return psf def _build_polynomial_psf_taper(reference_stamp, sampling, fwhm): """Build a smooth radial taper for low-S/N outer PSF pixels.""" h, w = reference_stamp.shape y, x = np.mgrid[0:h, 0:w] cx = 0.5 * (w - 1) cy = 0.5 * (h - 1) r = np.sqrt(((x - cx) * sampling) ** 2 + ((y - cy) * sampling) ** 2) peak = float(np.nanmax(reference_stamp)) half_width = 0.5 * (min(h, w) - 1) * sampling taper_start = 0.75 * half_width threshold = 0.02 * peak if np.isfinite(peak) and peak > 0: edges = np.arange(0, np.max(r) + sampling, sampling) centers = 0.5 * (edges[:-1] + edges[1:]) profile = [] for r0, r1 in zip(edges[:-1], edges[1:]): idx = (r >= r0) & (r < r1) profile.append(np.nanmedian(reference_stamp[idx]) if np.any(idx) else np.nan) profile = np.asarray(profile, float) idx = np.where(np.isfinite(profile) & (profile <= threshold))[0] if len(idx): taper_start = float(centers[idx[0]]) taper_start = float( np.clip( taper_start, max(1.25 * fwhm, sampling), max(1.25 * fwhm, half_width - 2 * sampling), ) ) taper_width = max(half_width - taper_start, 0.0) if taper_width <= 0: return np.ones_like(reference_stamp), taper_start, taper_width window = np.ones_like(reference_stamp) idx = r > taper_start if np.any(idx): phase = np.clip((r[idx] - taper_start) / taper_width, 0.0, 1.0) window[idx] = 0.5 * (1.0 + np.cos(np.pi * phase)) window[r >= half_width] = 0.0 return window, taper_start, taper_width def _regularize_polynomial_psf(coeffs, stamps_array, keep, sampling, fwhm): """Suppress spurious outer support and restore polynomial PSF normalization.""" reference_stamp = np.nanmedian(stamps_array[keep], axis=0) window, taper_start, taper_width = _build_polynomial_psf_taper(reference_stamp, sampling, fwhm) coeffs *= window[np.newaxis, :, :] template = np.clip(reference_stamp * window, 0.0, None) total = float(np.sum(template)) if not np.isfinite(total) or total <= 0: template = window.copy() total = float(np.sum(template)) template /= total plane_sums = coeffs.reshape(coeffs.shape[0], -1).sum(axis=1) target_sums = np.zeros_like(plane_sums) target_sums[0] = 1.0 coeffs += (target_sums - plane_sums)[:, np.newaxis, np.newaxis] * template[np.newaxis, :, :] return coeffs, taper_start, taper_width def _normalize_stamp_background_mode(mode): """Normalize public stamp-background options to internal mode names.""" if mode is None: return 'none' if isinstance(mode, (bool, np.bool_)): return 'median' if bool(mode) else 'none' if isinstance(mode, str): normalized = mode.strip().lower() aliases = { 'none': 'none', 'median': 'median', 'edge': 'median', 'constant': 'median', 'plane': 'plane', 'tilt': 'plane', } if normalized in aliases: return aliases[normalized] raise ValueError( "subtract_background should be False/True or one of " "{'none', 'median', 'plane'}" ) def _subtract_local_stamp_background(cutout, mask_cutout=None, mode='none', border=2): """Remove a local constant or tilted background from a stellar cutout.""" mode = _normalize_stamp_background_mode(mode) if mode == 'none': return cutout h, w = cutout.shape border = int(np.clip(border, 1, max(1, min(h, w) // 2))) edge = np.zeros_like(cutout, dtype=bool) edge[:border, :] = True edge[-border:, :] = True edge[:, :border] = True edge[:, -border:] = True valid = edge & np.isfinite(cutout) if mask_cutout is not None: valid &= ~np.asarray(mask_cutout, bool) if not np.any(valid): return cutout values = cutout[valid] if mode == 'median' or np.sum(valid) < 6: cutout -= np.median(values) return cutout yy, xx = np.mgrid[0:h, 0:w] xx0 = xx.astype(np.float64) - 0.5 * (w - 1) yy0 = yy.astype(np.float64) - 0.5 * (h - 1) x = xx0[valid] y = yy0[valid] z = values.astype(np.float64) keep = np.ones(len(z), dtype=bool) for _ in range(2): if np.sum(keep) < 3: break A = np.column_stack([np.ones(np.sum(keep)), x[keep], y[keep]]) coeffs, _, _, _ = np.linalg.lstsq(A, z[keep], rcond=None) model = coeffs[0] + coeffs[1] * x + coeffs[2] * y resid = z - model med = np.median(resid[keep]) mad = np.median(np.abs(resid[keep] - med)) * 1.4826 if not np.isfinite(mad) or mad <= 0: break new_keep = np.abs(resid - med) <= 3.0 * mad if np.sum(new_keep) < 3 or np.array_equal(new_keep, keep): break keep = new_keep if np.sum(keep) < 3: cutout -= np.median(values) return cutout A = np.column_stack([np.ones(np.sum(keep)), x[keep], y[keep]]) coeffs, _, _, _ = np.linalg.lstsq(A, z[keep], rcond=None) background = coeffs[0] + coeffs[1] * xx0 + coeffs[2] * yy0 cutout -= background return cutout def _create_psf_model_polynomial( image, obj, fwhm, size, mask, oversampling, degree, regularization, subtract_neighbors, background_mode, log, ): """Build position-dependent PSF by fitting per-pixel polynomials to star stamps. Follows the PSFEx algorithm: resamples star cutouts onto an oversampled grid, then fits polynomial coefficients per pixel via least squares. """ ncoeffs = (degree + 1) * (degree + 2) // 2 log('Building position-dependent PSF model: degree=%d (%d coefficients)' % (degree, ncoeffs)) if background_mode != 'none': log('Subtracting local stamp background using %s model' % background_mode) if len(obj) < ncoeffs: raise ValueError( "Need at least %d stars for degree=%d polynomial, got %d" % (ncoeffs, degree, len(obj)) ) sampling = 1.0 / oversampling # Oversampled stamp size (keep odd for centered stamps) os_size = size * oversampling if os_size % 2 == 0: os_size += 1 os_center = (os_size - 1) / 2.0 cut_center = (size - 1) / 2.0 # Normalization parameters: image center and half-size # This maps coordinates to approximately [-1, 1] range x0 = image.shape[1] / 2.0 y0 = image.shape[0] / 2.0 sx = image.shape[1] / 2.0 sy = image.shape[0] / 2.0 # Oversampled coordinate grid (computed once, reused for all stamps) oy, ox = np.mgrid[0:os_size, 0:os_size] ox_rel = (ox - os_center) * sampling # relative to stamp center, in image pixels oy_rel = (oy - os_center) * sampling # Build neighbor subtraction image if requested # We subtract Gaussian approximations of all OTHER stars from each cutout if subtract_neighbors and 'flux' in obj.colnames: sigma = fwhm / 2.3548 # FWHM to sigma star_x = np.array(obj['x'], dtype=np.float64) star_y = np.array(obj['y'], dtype=np.float64) star_flux = np.array(obj['flux'], dtype=np.float64) else: subtract_neighbors = False # Extract and resample stamps stamps = [] positions_x = [] positions_y = [] stamp_fluxes = [] half = size // 2 for i in range(len(obj)): x_star = float(obj['x'][i]) y_star = float(obj['y'][i]) # Integer center and sub-pixel offset ix = int(np.round(x_star)) iy = int(np.round(y_star)) dx = x_star - ix dy = y_star - iy # Cutout boundaries x1, x2 = ix - half, ix + half + 1 y1, y2 = iy - half, iy + half + 1 # Skip if cutout extends beyond image if x1 < 0 or x2 > image.shape[1] or y1 < 0 or y2 > image.shape[0]: continue cutout = image[y1:y2, x1:x2].astype(np.float64).copy() # Subtract estimated neighbor flux from cutout if subtract_neighbors: # Pixel coordinate grids for this cutout cy, cx = np.mgrid[y1:y2, x1:x2] for j in range(len(obj)): if j == i: continue # Only consider neighbors close enough to affect this cutout ndx = star_x[j] - ix ndy = star_y[j] - iy if abs(ndx) > half + 5 * sigma and abs(ndy) > half + 5 * sigma: continue # Subtract Gaussian approximation r2 = (cx - star_x[j]) ** 2 + (cy - star_y[j]) ** 2 amp = star_flux[j] / (2 * np.pi * sigma**2) cutout -= amp * np.exp(-r2 / (2 * sigma**2)) # Skip if mask has too many bad pixels in this cutout mask_cutout = None if mask is not None: mask_cutout = mask[y1:y2, x1:x2] if np.sum(mask_cutout) > 0.1 * cutout.size: continue cutout[mask_cutout] = 0.0 if background_mode != 'none': cutout = _subtract_local_stamp_background(cutout, mask_cutout, mode=background_mode) # Resample to oversampled grid, centering star at stamp center # Map each oversampled pixel back to cutout coordinates cutout_x = cut_center + dx + ox_rel cutout_y = cut_center + dy + oy_rel stamp = ndimage.map_coordinates( cutout, [cutout_y, cutout_x], order=3, mode='constant', cval=0.0 ) # Normalize to unit flux total = np.sum(stamp) if not np.isfinite(total) or total <= 0: continue stamp /= total stamps.append(stamp) positions_x.append(x_star) positions_y.append(y_star) stamp_fluxes.append(float(obj['flux'][i]) if 'flux' in obj.colnames else 1.0) nstars = len(stamps) log('Extracted %d valid stamps for polynomial fitting' % nstars) if nstars < ncoeffs: raise ValueError( "Only %d valid stamps, need at least %d for degree=%d" % (nstars, ncoeffs, degree) ) # Build Vandermonde matrix [nstars x ncoeffs] # Same term ordering as get_supersampled_psf_stamp: i2 outer, i1 inner x_norm = (np.array(positions_x) - x0) / sx y_norm = (np.array(positions_y) - y0) / sy stamp_fluxes = np.asarray(stamp_fluxes, dtype=float) V = [] for i2 in range(degree + 1): for i1 in range(degree + 1 - i2): V.append(x_norm**i1 * y_norm**i2) V = np.column_stack(V) # Stack stamps as [nstars, npixels] stamps_array = np.array(stamps) # [nstars, os_size, os_size] pixels = stamps_array.reshape(nstars, -1) # [nstars, npixels] # Each training stamp is normalized to unit flux, so an unweighted solve # over-emphasizes low-S/N outer pixels from fainter stars and broadens the # reconstructed wings. Weight by source flux as a proxy for stamp S/N. weights = np.ones(nstars, dtype=np.float64) valid_flux = np.isfinite(stamp_fluxes) & (stamp_fluxes > 0) if np.any(valid_flux): flux_ref = np.nanmedian(stamp_fluxes[valid_flux]) if np.isfinite(flux_ref) and flux_ref > 0: weights[valid_flux] = stamp_fluxes[valid_flux] / flux_ref weights = np.clip(weights, 0.3, 5.0) log( 'Applying flux weights to polynomial PSF fit: median %.0f, range %.2f..%.2f' % (flux_ref, np.min(weights), np.max(weights)) ) # Solve per-pixel polynomial with iterative sigma-clipping. # Outlier stamps (from neighbor contamination or artifacts) bias # the mean-based least-squares fit; clipping rejects them. keep = np.ones(nstars, dtype=bool) clip_sigma = 3.0 max_clip_iters = 3 with np.errstate(over='ignore', invalid='ignore', divide='ignore'): for clip_iter in range(max_clip_iters + 1): w_k = weights[keep] V_k = V[keep] * w_k[:, np.newaxis] pixels_k = pixels[keep] * w_k[:, np.newaxis] n_keep = int(keep.sum()) if n_keep < ncoeffs: break if regularization > 0: VTV = V_k.T @ V_k + regularization * np.eye(ncoeffs) VTp = V_k.T @ pixels_k coeffs = np.linalg.solve(VTV, VTp) else: coeffs, _, _, _ = np.linalg.lstsq(V_k, pixels_k, rcond=None) if clip_iter == max_clip_iters: break # Compute per-stamp RMS residual reconstructed = V @ coeffs # all stamps, not just kept residuals = pixels - reconstructed stamp_rms = np.sqrt(np.mean(residuals**2, axis=1)) med_rms = np.median(stamp_rms[keep]) mad_rms = np.median(np.abs(stamp_rms[keep] - med_rms)) * 1.4826 if mad_rms < 1e-15: break new_keep = stamp_rms < med_rms + clip_sigma * mad_rms n_rejected = int(keep.sum() - new_keep.sum()) if n_rejected == 0: break if new_keep.sum() < ncoeffs: break keep = new_keep log( 'Sigma-clip iter %d: rejected %d stamps, %d remaining' % (clip_iter + 1, n_rejected, keep.sum()) ) # Suppress low-S/N outer support where the polynomial fit otherwise tends # to create square-edge pedestals and ringing. psf_data, taper_start, taper_width = _regularize_polynomial_psf( coeffs.reshape(ncoeffs, os_size, os_size), stamps_array, keep, sampling, fwhm, ) coeffs = psf_data.reshape(ncoeffs, -1) if taper_width > 0: log( 'Applied outer taper to polynomial PSF: start %.2f px, width %.2f px' % (taper_start, taper_width) ) # Compute and report residual statistics with np.errstate(over='ignore', invalid='ignore', divide='ignore'): reconstructed = V[keep] @ coeffs residuals = pixels[keep] - reconstructed rms = np.sqrt(np.nanmean(residuals**2)) log( 'Polynomial PSF fit: %d x %d pixels, %d coefficients, ' '%d/%d stamps used, RMS residual %.2e (per pixel, normalized)' % (os_size, os_size, ncoeffs, int(keep.sum()), nstars, rms) ) psf = { 'width': os_size, 'height': os_size, 'fwhm': fwhm, 'sampling': sampling, 'ncoeffs': ncoeffs, 'degree': degree, 'x0': x0, 'y0': y0, 'sx': sx, 'sy': sy, 'data': psf_data, 'oversampling': oversampling, 'type': 'epsf', } return psf
[docs] def enclosed_psf_fraction(psf, x=0, y=0, radius=None, subpixel=10): """Fraction of normalised PSF flux inside a circular aperture. Evaluates the (possibly position-dependent) PSF model at ``(x, y)`` via :func:`get_psf_stamp`, normalises it to unit total flux, and sums the pixels lying within ``radius`` of the stamp centre. Useful for on-the-fly aperture corrections. Pixels are weighted by the fractional coverage of the circular aperture using a sub-pixel grid (``subpixel`` × ``subpixel`` samples per pixel) — set ``subpixel=1`` to fall back to a hard pixel-centre mask, which is faster but has up to ~10 % radius-dependent jitter when the aperture edge cuts through individual pixels. Parameters ---------- psf : dict PSF model from :func:`run_psfex`, :func:`load_psf`, or :func:`create_psf_model`. x, y : float, optional Position in image pixels at which to evaluate a position-dependent PSF model. The sub-pixel parts of ``x`` and ``y`` are folded into the stamp centre via :func:`get_psf_stamp`. radius : float or array-like Aperture radius (or radii) in image pixels. subpixel : int, optional Linear sub-pixel sampling per image pixel for partial-coverage weighting. Default 10 (100 sub-samples per pixel). Returns ------- float or ndarray Enclosed flux fraction at each radius. Scalar when ``radius`` is a scalar, otherwise an array of the same shape. """ if radius is None: raise TypeError("radius is required") radius_arr = np.atleast_1d(np.asarray(radius, float)) stamp = get_psf_stamp(psf, x, y, normalize=True) h, w = stamp.shape cx = w // 2 + (x - np.round(x)) cy = h // 2 + (y - np.round(y)) if int(subpixel) <= 1: yy, xx = np.mgrid[0:h, 0:w] rr2 = (xx - cx) ** 2 + (yy - cy) ** 2 out = np.array([float(np.sum(stamp[rr2 <= r ** 2])) for r in radius_arr]) else: n = int(subpixel) # Sub-pixel grid of offsets within a single pixel, centred on 0. sub = (np.arange(n) + 0.5) / n - 0.5 dyy, dxx = np.meshgrid(sub, sub, indexing='ij') yy, xx = np.mgrid[0:h, 0:w] # For each pixel, distance² from aperture centre at every sub-pixel rr2 = (xx[:, :, None, None] - cx + dxx[None, None, :, :]) ** 2 \ + (yy[:, :, None, None] - cy + dyy[None, None, :, :]) ** 2 out = np.empty(radius_arr.size, dtype=float) for i, r in enumerate(radius_arr): frac = np.mean(rr2 <= r ** 2, axis=(2, 3)) out[i] = float(np.sum(stamp * frac)) return float(out[0]) if np.isscalar(radius) or np.ndim(radius) == 0 else out
[docs] def select_psf_seeds( obj, image_shape, *, max_per_tile=25, grid=6, edge=20, obj_col_x='x', obj_col_y='y', obj_col_flux='flux', obj_col_flags='flags', accept_flags=0, ): """Select bright, edge-clear sources spread uniformly across the field. Bins ``obj`` on a regular ``grid_x × grid_y`` spatial grid and returns the ``max_per_tile`` brightest sources per cell that pass basic quality filters: finite ``x``/``y``/flux, positive flux, optionally a flag mask, and inside the image after an ``edge``-pixel margin. The returned table is a strict subset of ``obj`` (same columns, sub-selected rows) and is the natural input for :func:`stdpipe.psf.create_psf_model` and similar PSF-modelling routines that benefit from a uniform spatial sampling. Parameters ---------- obj : astropy.table.Table Source catalogue. image_shape : (H, W) tuple Image shape used for the spatial grid and edge mask. max_per_tile : int Maximum number of seeds kept per spatial cell. grid : int or (nx, ny) tuple Number of cells in the spatial grid. A scalar means a square ``grid × grid`` layout. edge : float Edge margin in pixels; sources within ``edge`` of any image boundary are excluded. obj_col_x, obj_col_y : str Column names for source positions in pixel coordinates. obj_col_flux : str Column name used to rank candidates within each cell (brightest first). Sources with non-positive values are dropped. obj_col_flags : str, optional Column name for a SExtractor-style integer flag mask. Set to ``None`` to skip flag filtering. If the column is missing from ``obj``, no flag filter is applied either. accept_flags : int Bitwise mask of acceptable flags. A source is kept only when ``flags & ~accept_flags == 0``. Returns ------- astropy.table.Table A subset of ``obj`` containing the selected seeds. """ if isinstance(grid, int): grid_x = grid_y = grid else: grid_x, grid_y = grid H, W = image_shape x = np.asarray(obj[obj_col_x], float) y = np.asarray(obj[obj_col_y], float) flux = np.asarray(obj[obj_col_flux], float) good = ( np.isfinite(x) & np.isfinite(y) & np.isfinite(flux) & (flux > 0) & (x > edge) & (x < W - edge) & (y > edge) & (y < H - edge) ) if obj_col_flags is not None and obj_col_flags in obj.colnames: flags = np.asarray(obj[obj_col_flags], int) good &= (flags & ~int(accept_flags)) == 0 if not np.any(good): return obj[good] # empty subset, preserves columns cand_idx = np.where(good)[0] cx = x[cand_idx]; cy = y[cand_idx]; cflux = flux[cand_idx] xbin = np.clip((cx / W * grid_x).astype(int), 0, grid_x - 1) ybin = np.clip((cy / H * grid_y).astype(int), 0, grid_y - 1) keep = [] for iy in range(grid_y): for ix in range(grid_x): sel = np.where((xbin == ix) & (ybin == iy))[0] if sel.size: order = np.argsort(cflux[sel])[::-1][:max_per_tile] keep.append(cand_idx[sel[order]]) if not keep: return obj[np.zeros(len(obj), dtype=bool)] return obj[np.unique(np.concatenate(keep))]