Source code for stdpipe.subtraction

import os, shutil, tempfile, shlex
import numpy as np

from astropy.io import fits
from astropy.stats import mad_std
from astropy.table import Table


# Drop-in replacement for numpy.fft which is supposedly faster
import pyfftw
import pyfftw.interfaces.numpy_fft as fft

pyfftw.interfaces.cache.enable()
pyfftw.interfaces.cache.set_keepalive_time(1.0)

from scipy import ndimage
from scipy.optimize import least_squares
import statsmodels.api as sm

import sep

from . import astrometry
from . import pipeline
from . import psf
from . import sfft as sfft_module


[docs] def run_hotpants( image, template, mask=None, template_mask=None, err=None, template_err=None, extra=None, image_fwhm=None, template_fwhm=None, image_gain=None, template_gain=1000, rel_r=3, rel_rss=4, nx=1, ny=None, obj=None, get_convolved=False, get_scaled=False, get_noise=False, get_kernel=False, get_header=False, _tmpdir=None, _workdir=None, _exe=None, verbose=False, ): """Wrapper for running HOTPANTS to subtract a template from science image. To better understand the logic and available subtraction options you may check HOTPANTS documentation at https://github.com/acbecker/hotpants The routine tries to be clever and select reasonable defaults for running HOTPANTS, but you may always directly specify any HOTPANTS option manually through `extra` parameter. The noise model for both science and template images may optionally be provided as an external parameters (`err` and `template_err`). Moreover, if these parameters are set to `True`, the routine will try to build noise models itself. To do so, it will estimate the image background and background RMS. Then, this background RMS map is used as a primary component of the noise map. On top of that, the contribution of the sources is added by subtracting the background from original image, smoothing it a bit to mitigate pixel-level noise a bit, and then converting everything above zero to corresponding Poissonian noise contribution by dividing with gain value and taking a square root. `rel_r` and `rel_rss` define the scaling of corresponding HOTPANTS parameters (which are the convolution kernel half width and half-width of the sub-stamps used for kernel fitting) with image FWHM. The routine also uses "officially recommedned" logic for defining HOTPANTS `ng` parameter, that is to set it to :code:`[3, 6, 0.5*sigma_match, 4, 1.0*sigma_match, 2, 2.0*sigma_match]` where :code:`sigma_match = np.sqrt(image_fwhm**2 - template_fwhm**2) / 2.35`. This approach assumes that :code:`image_fwhm > template_fwhm`, and of course needs `image_fwhm` and `template_fwhm` to be provided. Parameters ---------- image : numpy.ndarray Input science image as a NumPy array. template : numpy.ndarray Input template image, should have the same shape as the science image. mask : numpy.ndarray, optional Science image mask as a boolean array (True values will be masked). template_mask : numpy.ndarray, optional Template image mask as a boolean array (True values will be masked). err : numpy.ndarray or bool, optional Science image error map (expected RMS of every pixel). If set to True, the code will try to build the noise map directly from the image and ``image_gain`` parameter. template_err : numpy.ndarray or bool, optional Template image error map. If set to True, the code will try to build the noise map directly from the template and ``template_gain`` parameter. extra : dict, optional Extra parameters to be passed to HOTPANTS executable, with parameter names as keys. image_fwhm : float, optional FWHM of science image in pixels. template_fwhm : float, optional FWHM of template image in pixels. image_gain : float, optional Gain of science image. template_gain : float, optional Gain of template image. rel_r : float, optional If specified, HOTPANTS ``r`` parameters will be set to :code:`image_fwhm*rel_r`. rel_rss : float, optional If specified, HOTPANTS ``rss`` parameters will be set to :code:`image_fwhm*rel_rss`. nx : int, optional Number of image sub-regions in ``x`` direction. ny : int, optional Number of image sub-regions in ``y`` direction. obj : astropy.table.Table, optional List of objects detected on science image. If provided, it will be used to better place the sub-stamps used to derive the convolution kernel. get_convolved : bool, optional Whether to also return the convolved template image. get_scaled : bool, optional Whether to also return noise-normalized difference image. get_noise : bool, optional Whether to also return difference image noise model. get_kernel : bool, optional Whether to also return convolution kernel used in the subtraction. get_header : bool, optional Whether to also return the FITS header from HOTPANTS output file. _workdir : str, optional If specified, all temporary files will be created in this directory, and will be kept intact after running HOTPANTS. May be used for debugging. _tmpdir : str, optional If specified, all temporary files will be created in a dedicated directory (that will be deleted after running the executable) inside this path. _exe : str, optional Full path to HOTPANTS executable. If not provided, the code tries to locate it automatically in your :envvar:`PATH`. verbose : bool or callable, optional Whether to show verbose messages during the run of the function or not. Returns ------- numpy.ndarray or list The difference image and, optionally, other kinds of images as requested by the ``get_*`` options above. """ # Simple wrapper around print for logging in verbose mode only log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None # Find the binary binname = None if _exe is not None: # Check user-provided binary path, and fail if not found if os.path.isfile(_exe): binname = _exe else: # Find HOTPANTS binary in common paths for exe in ['hotpants']: binname = shutil.which(exe) if binname is not None: break if binname is None: log("Can't find HOTPANTS binary") return None # else: # log("Using HOTPANTS binary at", binname) if mask is None: mask = ~np.isfinite(image) else: mask = mask | ~np.isfinite(image) if template_mask is None: template_mask = ~np.isfinite(template) else: template_mask = template_mask | ~np.isfinite(template) imin, imax = np.min(image[~mask]), np.max(image[~mask]) tmin, tmax = np.min(template[~template_mask]), np.max(template[~template_mask]) # As HOTPANTS uses inclusive checks for high values, let's extend their range just a bit imax += 0.01 * (imax - imin) tmax += 0.01 * (tmax - tmin) # Logic from https://arxiv.org/pdf/1608.01006.pdf imin = np.median(image[~mask]) - 10 * mad_std(image[~mask]) tmin = np.median(template[~template_mask]) - 10 * mad_std(template[~template_mask]) _nan = 1e-30 # workdir = _workdir if _workdir is not None else tempfile.mkdtemp(prefix='hotpants', dir=_tmpdir) image = image.copy() image[~np.isfinite(image)] = np.nanmedian(image) # _nan # image[mask] = _nan imagename = os.path.join(workdir, 'image.fits') fits.writeto(imagename, image, overwrite=True) imaskname = os.path.join(workdir, 'imask.fits') fits.writeto(imaskname, mask.astype(np.uint16), overwrite=True) template = template.copy() template[~np.isfinite(template)] = np.nanmedian(template) # _nan # template[template_mask] = _nan templatename = os.path.join(workdir, 'template.fits') fits.writeto(templatename, template, overwrite=True) tmaskname = os.path.join(workdir, 'tmask.fits') fits.writeto(tmaskname, template_mask.astype(np.uint16), overwrite=True) outname = os.path.join(workdir, 'diff.fits') stampname = os.path.join(workdir, 'stamps.reg') if not ny: ny = nx params = { 'inim': imagename, 'tmplim': templatename, 'outim': outname, 'savexy': stampname, # Masks 'imi': imaskname, 'tmi': tmaskname, # Lower and upper valid values for image 'il': imin, 'iu': imax, # Lower and upper valid values for template 'tl': tmin, 'tu': tmax, 'tuk': tmax, # Limit used for kernel estimation, why not the same as 'tu'?.. # Normalize result to input image 'n': 'i', # Return all possible image planes as a result 'allm': True, # Convolve template image 'c': 't', # Add kernel info to the output 'hki': True, # Number of sub-regions 'nrx': nx, 'nry': ny, # Disable positional variance of the kernel and background 'ko': 0, 'bgo': 0, 'v': 2, } # Error map for input image if err is not None: if type(err) is bool and err == True: err, _ = _build_err_map(image, mask, gain=image_gain, verbose_log=log) if hasattr(err, 'shape'): errname = os.path.join(workdir, 'err.fits') params['ini'] = errname fits.writeto(errname, err, overwrite=True) if template_err is not None: if type(template_err) is bool and template_err == True: template_err, _ = _build_err_map( template, template_mask, gain=template_gain, verbose_log=log ) if hasattr(template_err, 'shape'): terrname = os.path.join(workdir, 'terr.fits') params['tni'] = terrname fits.writeto(terrname, template_err, overwrite=True) if image_fwhm is not None and template_fwhm is not None: # Recommended logic for sigma_match if image_fwhm > template_fwhm: sigma_match = np.sqrt(image_fwhm**2 - template_fwhm**2) / 2.35 sigma_match = max(1.0, sigma_match) params['ng'] = [ 3, 6, 0.5 * sigma_match, 4, 1.0 * sigma_match, 2, 2.0 * sigma_match, ] elif image_fwhm < template_fwhm: sigma_match = np.sqrt(template_fwhm**2 - image_fwhm**2) / 2.35 sigma_match = max(1.0, sigma_match) params['ng'] = [ 3, 6, 0.5 * sigma_match, 4, 1.0 * sigma_match, 2, 2.0 * sigma_match, ] # TODO: switch to convolve-image mode?.. if image_fwhm is not None: # Logic from https://arxiv.org/pdf/1608.01006.pdf suggests 2.5 and 6 here params['r'] = int(np.ceil(image_fwhm * rel_r)) params['rss'] = int(np.ceil(image_fwhm * rel_rss)) if image_gain is not None: params['ig'] = image_gain if template_gain is not None: params['tg'] = template_gain if extra is not None: params.update(extra) # Set the stamp locations from detected objects if obj is not None and len(obj): log('Using %d external positions for substamps' % len(obj)) xyname = os.path.join(workdir, 'objects.xy') np.savetxt(xyname, [[_['x'] + 1, _['y'] + 1] for _ in Table(obj)], fmt='%.1f') params['ssf'] = xyname # Build command line command = [binname] for key in params.keys(): if params[key] is None: pass elif type(params[key]) == bool: if params[key]: command.append('-' + key) else: pass else: if type(params[key]) == str: # Quote string if necessary value = shlex.quote(params[key]) elif hasattr(params[key], '__len__'): # List or array, for multi-valued arguments value = " ".join([str(_) for _ in params[key]]) else: value = str(params[key]) command.append('-' + key + ' ' + value) command = " ".join(command) if not verbose: command += " >/dev/null 2>/dev/null" log('Will run HOTPANTS like that:') log(command) if os.path.exists(outname): os.unlink(outname) # Run the command os.system(command) if not os.path.exists(outname): log('HOTPANTS run failed') return None elif not np.any(fits.getdata(outname, 0)): log('HOTPANTS failed to perform subtraction') return None else: log('HOTPANTS run succeeded') # Difference image result = [fits.getdata(outname, 0)] header = fits.getheader(outname) if os.path.exists(stampname): with open(stampname, 'r') as f: lines = f.readlines() lines = lines[1:] header['NSTAMPS'] = len(lines) log('%d stamps used' % len(lines)) if get_convolved: # Convolved image conv = fits.getdata(outname, 1) result.append(conv) if get_scaled: # Noise-scaled difference image sdiff = fits.getdata(outname, 2) result.append(sdiff) if get_noise: # Noise image noise = fits.getdata(outname, 3) result.append(noise) if get_kernel: # Kernel information kernel = Table.read(outname, format='fits', hdu=4) result.append(kernel) if get_header: # Header with metadata result.append(header) if _workdir is None: shutil.rmtree(workdir) if len(result) == 1: return result[0] else: return result
[docs] def run_zogy( image, template, mask=None, template_mask=None, err=None, template_err=None, image_psf=None, template_psf=None, image_gain=None, template_gain=None, image_fwhm=None, template_fwhm=None, scale=None, psf_clean=0, dx=0.25, dy=0.25, nx=1, ny=None, overlap=50, fit_scale=True, fit_shift=True, good_regions=None, image_obj=None, template_obj=None, get_psf=False, get_Fpsf=False, nthreads=0, verbose=False, **kwargs, ): """Image subtraction using Zackay–Ofek–Gal-Yam (ZOGY) algorithm, as described in http://dx.doi.org/10.3847/0004-637X/830/1/27 The science and template images should be already aligned astrometrically, and roughly matched photometrically, i.e. have similar flux scale. There is an option to fit for the small difference in flux scales (`fit_scale`), which is enabled by default. Also, optionally (if `fit_shift` is set) it may fit for a small systematic positional error (sub-pixel shift) of the template. Required inputs are science and template images (plus optionally their masks), their noise maps and PSFs, as well as their relative flux scale. If noise maps are not provided, they will be constructed from the input images using estimated background noise and gain values for source contributions. If PSFs are not specified, they will be also estimated from the images using :func:`stdpipe.psf.run_psfex`. Flux normalization for PSF estimation also requires FWHMs of the images, which may either be specified directly, or estimated from the images too. Relative template image flux scaling may either be provided as `scale` argument, or it may be estimated from the lists of objects detected in both images. These may either be provided directly (`image_obj` and `template_obj`), or derived during PSF estimation. These objects will also be used for restricting the scale/shift fitting to their vicinities only. Alternatively, the map of good regions may be directly provided as `good_regions` argument. The image will be optionally split into given number (`nx`x`ny`) of sub-images, subtracted independently, and then the results will be stitched back. Sub-images will overlap by `overlap` pixels. Parameters ---------- image : numpy.ndarray Input science image as a NumPy array. template : numpy.ndarray Input template image, should have the same shape as the science image. mask : numpy.ndarray, optional Science image mask as a boolean array (True values will be masked). template_mask : numpy.ndarray, optional Template image mask as a boolean array (True values will be masked). err : numpy.ndarray, optional Science image noise map (expected RMS of every pixel). If not set, the code will try to build the noise map directly from the image and ``image_gain`` parameter. template_err : numpy.ndarray, optional Template image noise map. If not set, the code will try to build the noise map directly from the template and ``template_gain`` parameter. image_gain : float, optional Gain of science image, to be used for construction of the noise map. template_gain : float, optional Gain of template image, to be used for construction of the noise map. image_psf : dict, optional PSF for science image. If not provided, will be estimated by :func:`stdpipe.psf.run_psfex`. template_psf : dict, optional PSF for template image. If not provided, will be estimated by :func:`stdpipe.psf.run_psfex`. image_fwhm : float, optional FWHM of science image, to be used for estimated PSF normalization. template_fwhm : float, optional FWHM of template image, to be used for estimated PSF normalization. scale : float, optional Template image flux scale relative to science image. psf_clean : float, optional If non-zero, will clean the PSF regions with relative values less than this factor. dx : float, optional Astrometric uncertainty (sigma) in x coordinate. dy : float, optional Astrometric uncertainty (sigma) in y coordinate. nx : int, optional Number of sub-images in ``x`` direction. ny : int, optional Number of sub-images in ``y`` direction. overlap : int, optional If set, defines how much sub-images will overlap, in pixels. fit_scale : bool, optional If set, will fit for the difference in flux scales between template and science images. fit_shift : bool, optional If set, will also fit for the sub-pixel shift between template and science images. good_regions : numpy.ndarray, optional If set, this boolean map will be used to restrict scale/shift fitting to only these regions. image_obj : astropy.table.Table, optional List of objects detected in science image. If provided, they will be used for placing good regions. template_obj : astropy.table.Table, optional List of objects detected in template image. If provided, they will be used for deriving template flux scale. get_psf : bool, optional If set, will also return the PSF of the difference image. get_Fpsf : bool, optional If set, will also return the optimal PSF photometry image and its error. nthreads : int, optional Set the number of threads to use for FFTW routines (0 for auto). verbose : bool or callable, optional Whether to show verbose messages during the run of the function or not. **kwargs Extra parameters to be passed to :func:`stdpipe.psf.run_psfex` for PSF estimation. Returns ------- list The list of the difference image and other subtraction results: - D: Subtracted image - S_corr: Corrected subtracted image - P_D: PSF of subtracted image, if :code:`get_psf=True` - Fpsf: optimal PSF photometry image, if :code:`get_Fpsf=True` - Fpsf_err: optimal PSF photometry error image, if :code:`get_Fpsf=True` """ # Simple wrapper around print for logging in verbose mode only log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None if mask is None: mask = ~np.isfinite(image) else: mask = mask | ~np.isfinite(image) if template_mask is None: template_mask = ~np.isfinite(template) else: template_mask = template_mask | ~np.isfinite(template) # Build noise models and subtract background if err is None: U_N_full, image_bg = _build_err_map(image, mask, gain=image_gain, verbose_log=log) SN = image_bg.globalrms else: image_bg = sep.Background(image, mask) U_N_full = err SN = np.median(err) if template_err is None: U_R_full, template_bg = _build_err_map( template.astype(np.double), template_mask, gain=template_gain, verbose_log=log ) SR = template_bg.globalrms else: template_bg = sep.Background(template.astype(np.double), template_mask) U_R_full = template_err SR = np.median(template_err) N_full = image - image_bg.back() R_full = template - template_bg.back() # Set to zero the regions where we have no data N_full[~np.isfinite(image)] = 0 R_full[~np.isfinite(template)] = 0 # Artificially assign large uncertainty to the regions we set to zero U_N_full[~np.isfinite(image)] = np.nanmax(U_N_full) U_R_full[~np.isfinite(template)] = np.nanmax(U_R_full) # Estimate image PSF, if not specified if image_psf is None: log("Estimating image PSF") # TODO: should we overwrite image_obj if it was set by the user?.. image_psf, image_obj = psf.run_psfex( image, mask=mask, gain=image_gain, aper=2.0 * image_fwhm if image_fwhm else None, verbose=verbose, get_obj=True, **kwargs, ) # Estimate template PSF, if not specified if template_psf is None: log("Estimating template PSF") # TODO: should we overwrite image_obj if it was set by the user?.. template_psf, template_obj = psf.run_psfex( template, mask=template_mask, gain=template_gain, aper=2.0 * template_fwhm if template_fwhm else None, verbose=verbose, get_obj=True, **kwargs, ) # "Good" regions for flux scale refinement if good_regions is not None and isinstance(good_regions, np.ndarray): good_regions = good_regions.astype(np.bool) log("Will use user-provided regions for scale/shift fitting") else: good_regions = np.zeros_like(image, dtype=bool) # Guess initial relative flux scale if image_obj and template_obj: # Use half FWHM radius for matching sr = 0.5 * max(image_psf['fwhm'], template_psf['fwhm']) iidx, tidx, _ = astrometry.planar_match( image_obj['x'], image_obj['y'], template_obj['x'], template_obj['y'], sr ) if len(iidx): # Exclude all flagged objects fidx = (image_obj['flags'][iidx] & 0x100) == 0 fidx &= (template_obj['flags'][tidx] & 0x100) == 0 if np.sum(fidx) > 5: # Compute photometric zero point shift between two object lists ZP = template_obj['mag'][tidx][fidx] - image_obj['mag'][iidx][fidx] dZP = np.hypot(image_obj['magerr'][iidx][fidx], template_obj['magerr'][tidx][fidx]) X = np.ones_like(ZP).T # Weighted robust regression C = sm.RLM(ZP / dZP, (X.T / dZP).T).fit() scale = 10 ** (-0.4 * C.params[0]) log("Estimated initial template flux scale Fr = %.3g" % scale) else: log("Not enough matched objects to guess template flux scale") if len(iidx) > 0 and (fit_scale or fit_shift) and np.sum(good_regions) == 0: # Use matched objects (they are all unflagged) to define good regions size = int(6 * sr) # 3*FWHM N = 0 for i in range(len(iidx)): x0 = int(image_obj['x'][iidx][i]) y0 = int(image_obj['y'][iidx][i]) # Exclude objects too close to the edges if ( x0 < size or x0 >= image.shape[1] - size or y0 < size or y0 >= image.shape[0] - size ): continue # Exclude sub-regions containing masked pixels if ( np.sum((mask | template_mask)[y0 - size : y0 + size, x0 - size : x0 + size]) == 0 ): good_regions[y0 - size : y0 + size, x0 - size : x0 + size] = True N += 1 if N > 0: log(N, "regions selected for scale/shift fitting") else: log("Initial template flux scale Fr = %.3g" % scale) if (fit_scale or fit_shift) and np.sum(good_regions) == 0: good_regions = np.ones_like(good_regions, dtype=bool) log("Will fit for scale/shift using whole image") # Split the image into sub-images to later stitch it back if not ny: ny = nx D_full = np.zeros_like(image, dtype=np.double) S_corr_full = np.zeros_like(image, dtype=np.double) D_full = np.zeros_like(image, dtype=np.double) Fpsf_full = np.zeros_like(image, dtype=np.double) Fpsf_err_full = np.zeros_like(image, dtype=np.double) for i, x0, y0, N, R, U_N, U_R, good_reg in pipeline.split_image( N_full, R_full, U_N_full, U_R_full, good_regions, nx=nx, ny=ny, overlap=overlap, get_index=True, get_origin=True, verbose=True if nx > 1 and ny > 1 else False, ): # PSF stamps at the centers of sub-image P_N_small = psf.get_psf_stamp( image_psf, x=x0 + N.shape[1] / 2, y=y0 + N.shape[0] / 2, dx=0, dy=0, normalize=True ) P_R_small = psf.get_psf_stamp( template_psf, x=x0 + N.shape[1] / 2, y=y0 + N.shape[0] / 2, dx=0, dy=0, normalize=True ) if psf_clean: # Set to zero the wings with relative amplitude less than psf_clean factor P_N_small[P_N_small < psf_clean * np.max(P_N_small)] = 0 P_N_small /= np.sum(P_N_small) P_R_small[P_R_small < psf_clean * np.max(P_R_small)] = 0 P_R_small /= np.sum(P_R_small) Fn = 1 # Fixed, so that the difference will be in science image flux scale Fr = scale if scale else 1 # Will be optionally adjusted later # Place PSF at the center of image with same size as new / reference P_N = np.zeros_like(N) P_R = np.zeros_like(R) idxN = tuple( [ slice( int(N.shape[0] / 2) - int(P_N_small.shape[0] / 2), int(N.shape[0] / 2) + int(P_N_small.shape[0] / 2) + 1, ), slice( int(N.shape[1] / 2) - int(P_N_small.shape[1] / 2), int(N.shape[1] / 2) + int(P_N_small.shape[1] / 2) + 1, ), ] ) idxR = tuple( [ slice( int(R.shape[0] / 2) - int(P_R_small.shape[0] / 2), int(R.shape[0] / 2) + int(P_R_small.shape[0] / 2) + 1, ), slice( int(R.shape[1] / 2) - int(P_R_small.shape[1] / 2), int(R.shape[1] / 2) + int(P_R_small.shape[1] / 2) + 1, ), ] ) P_N[idxN] = P_N_small P_R[idxR] = P_R_small # Shift the PSF to the origin so it will not introduce a shift P_N = fft.fftshift(P_N) P_R = fft.fftshift(P_R) # Take all the Fourier Transforms N_hat = fft.fft2(N, threads=nthreads) R_hat = fft.fft2(R, threads=nthreads) P_N_hat = fft.fft2(P_N, threads=nthreads) P_R_hat = fft.fft2(P_R, threads=nthreads) if fit_scale or fit_shift: # Estimate beta=Fn/Fr and shift using Equations 37-39 def fn(par): Fn = 1 Fr = par[0] D_hat_den = np.sqrt( SN**2 * Fr**2 * np.abs(P_R_hat**2) + SR**2 * Fn**2 * np.abs(P_N_hat**2) ) D_hat_n = P_R_hat * N_hat / D_hat_den D_hat_r = P_N_hat * R_hat / D_hat_den if len(par) > 1: D_hat_r = ndimage.fourier_shift(D_hat_r, par[1:]) DD_n = np.real(fft.ifft2(D_hat_n, threads=0)) DD_r = np.real(fft.ifft2(D_hat_r, threads=0)) return (Fr * DD_n - Fn * DD_r)[good_reg].flatten() if np.sum(good_reg) > 10: if fit_scale and fit_shift: log('Fitting for flux scale difference and sub-pixel template shift') C = least_squares( fn, [scale, 0, 0], bounds=((0, -0.5, -0.5), (np.inf, 0.5, 0.5)), verbose=0 ) elif fit_scale: log('Fitting for flux scale difference') C = least_squares(fn, [scale], bounds=(0, np.inf), verbose=0) else: log('Fitting for sub-pixel template shift') C = least_squares( fn, [scale, 0, 0], bounds=((scale * 0.9999, -0.5, -0.5), (scale * 1.0001, 0.5, 0.5)), verbose=0, ) if C.success: if fit_scale: Fr = C.x[0] log('Template flux scale Fr = %.3g' % Fr) if len(C.x) > 1: log('Shift is dy = %.2f dx = %.2f' % (C.x[1], C.x[2])) R_hat = ndimage.fourier_shift(R_hat, C.x[1:]) else: log('Fitting failed') else: log('Not enough good pixels for fitting') # Fourier Transform of Difference Image (Equation 13) D_hat_num = Fr * P_R_hat * N_hat - Fn * P_N_hat * R_hat D_hat_den = np.sqrt(SN**2 * Fr**2 * np.abs(P_R_hat**2) + SR**2 * Fn**2 * np.abs(P_N_hat**2)) D_hat = D_hat_num / D_hat_den # Flux-based zero point (Equation 15) FD = Fr * Fn / np.sqrt(SN**2 * Fr**2 + SR**2 * Fn**2) # Difference image corrected for correlated noise D = np.real(fft.ifft2(D_hat, threads=nthreads)) / FD # Fourier Transform of PSF of Subtraction Image (Equation 14) P_D_hat = Fr * Fn * P_R_hat * P_N_hat / FD / D_hat_den # PSF of Subtraction Image D P_D = np.real(fft.ifft2(P_D_hat, threads=nthreads)) P_D = fft.ifftshift(P_D) P_D = P_D[idxN] # Fourier Transform of Score Image (Equation 17) S_hat = FD * D_hat * np.conj(P_D_hat) # Score Image S = np.real(fft.ifft2(S_hat, threads=nthreads)) # Now start calculating Scorr matrix (including all noise terms) # Start out with source noise # Sigma to variance V_N = U_N**2 V_R = U_R**2 # Fourier Transform of variance images V_N_hat = fft.fft2(V_N, threads=nthreads) V_R_hat = fft.fft2(V_R, threads=nthreads) # Equation 28 kr_hat = Fr * Fn**2 * np.conj(P_R_hat) * np.abs(P_N_hat**2) / (D_hat_den**2) kr = np.real(fft.ifft2(kr_hat, threads=nthreads)) # Equation 29 kn_hat = Fn * Fr**2 * np.conj(P_N_hat) * np.abs(P_R_hat**2) / (D_hat_den**2) kn = np.real(fft.ifft2(kn_hat, threads=nthreads)) # Noise in New Image: Equation 26 V_S_N = np.real(fft.ifft2(V_N_hat * fft.fft2(kn**2, threads=nthreads), threads=nthreads)) # Noise in Reference Image: Equation 27 V_S_R = np.real(fft.ifft2(V_R_hat * fft.fft2(kr**2, threads=nthreads), threads=nthreads)) # Astrometric Noise # Equation 31 S_N = np.real(fft.ifft2(kn_hat * N_hat, threads=nthreads)) dSNdx = S_N - np.roll(S_N, 1, axis=1) dSNdy = S_N - np.roll(S_N, 1, axis=0) # Equation 30 V_ast_S_N = dx**2 * dSNdx**2 + dy**2 * dSNdy**2 # Equation 33 S_R = np.real(fft.ifft2(kr_hat * R_hat)) dSRdx = S_R - np.roll(S_R, 1, axis=1) dSRdy = S_R - np.roll(S_R, 1, axis=0) # Equation 32 V_ast_S_R = dx**2 * dSRdx**2 + dy**2 * dSRdy**2 # Calculate Scorr S_corr = S / np.sqrt(V_S_N + V_S_R + V_ast_S_N + V_ast_S_R) # PSF photometry (Equations 41-43) F_S = np.sum(Fn**2 * Fr**2 * np.abs(P_N_hat**2) * np.abs(P_R_hat**2) / (D_hat_den**2)) F_S /= S.shape[1] * S.shape[1] # divide by the number of pixels due to FFT normalization Fpsf = S / F_S # optimal PSF photometry, alpha in Equation 41 Fpsf_err = np.sqrt(V_S_N + V_S_R) / F_S # Stitch the piece back to the tapestry D_full[y0 : y0 + N.shape[0], x0 : x0 + N.shape[1]] = D S_corr_full[y0 : y0 + N.shape[0], x0 : x0 + N.shape[1]] = S_corr Fpsf_full[y0 : y0 + N.shape[0], x0 : x0 + N.shape[1]] = Fpsf Fpsf_err_full[y0 : y0 + N.shape[0], x0 : x0 + N.shape[1]] = Fpsf_err result = [D_full, S_corr_full] if get_psf: result += [P_D] if get_Fpsf: result += [Fpsf_full, Fpsf_err_full] return result
def _build_err_map(image, mask, gain=None, verbose_log=None): """Build a noise map from image and gain, following the HOTPANTS/ZOGY convention. Combines background RMS from SEP with source Poisson noise contribution. Returns ------- err : (ny, nx) noise map (per-pixel standard deviation) bg : sep.Background object """ log = verbose_log or (lambda *a, **k: None) bg = sep.Background(image.astype(np.double), mask) err = bg.rms() # Background noise level # Source Poisson noise contribution source = np.abs(image - bg.back()) if gain is not None and gain > 0: source /= gain else: source[:] = 0 err = np.sqrt(err**2 + source) log('Noise model: background %.1f, rms %.2f ADU' % (bg.globalback, bg.globalrms)) return err, bg
[docs] def run_sfft( image, template, mask=None, template_mask=None, err=None, template_err=None, image_gain=None, template_gain=None, kernel_shape=(7, 7), kernel_poly_order=2, bg_poly_order=2, flux_poly_order=1, flux_penalty=1e3, ridge=1e-6, sigma_clip=3.0, max_iter=5, obj=None, obj_size=None, get_convolved=False, get_scaled=False, get_noise=False, get_kernel=False, verbose=False, ): """Image subtraction using SFFT with spatially varying kernel. Wrapper around :func:`stdpipe.sfft.solve` that handles noise model construction, difference image noise propagation, and follows the same interface conventions as :func:`run_hotpants` and :func:`run_zogy`. The noise model for both science and template images may be provided directly (as arrays), or built automatically from the images using background estimation and gain values (same approach as HOTPANTS). If ``err`` or ``template_err`` is set to ``True``, the routine builds a noise map from the image using SEP background estimation and the corresponding gain parameter. Parameters ---------- image : numpy.ndarray Input science image as a NumPy array. template : numpy.ndarray Input template image, same shape as science image. mask : numpy.ndarray, optional Science image mask (True = bad). template_mask : numpy.ndarray, optional Template image mask (True = bad). err : numpy.ndarray or bool, optional Science image noise map (per-pixel RMS). If True, built from the image and ``image_gain``. template_err : numpy.ndarray or bool, optional Template image noise map. If True, built from the template and ``template_gain``. image_gain : float, optional Gain of science image in e-/ADU. template_gain : float, optional Gain of template image in e-/ADU. kernel_shape : tuple of int, optional (ky, kx) kernel size, must be odd. Default (7, 7). kernel_poly_order : int, optional Polynomial order for kernel spatial variation. Default 2. bg_poly_order : int, optional Polynomial order for differential background. Default 2. flux_poly_order : int, optional Polynomial order for kernel-sum (flux scale) constraint. Default 1. flux_penalty : float, optional Penalty weight for kernel-sum constraint. Default 1e3. ridge : float, optional Tikhonov regularization. Default 1e-6. sigma_clip : float, optional Sigma threshold for outlier rejection. Default 3.0. max_iter : int, optional Maximum sigma-clipping iterations. Default 5. obj : astropy.table.Table, optional List of detected objects with ``x``, ``y`` columns. If provided, kernel fitting is restricted to rectangular regions around these objects, analogous to HOTPANTS stamp placement. This concentrates the fit on high-S/N regions and can improve kernel quality. obj_size : int, optional Half-size in pixels of the fitting region around each object. If None (default), derived from the median ``fwhm`` column of *obj* (``3 × median(fwhm)``), or 15 pixels if ``fwhm`` is not available. get_convolved : bool, optional Whether to also return the convolved template. get_scaled : bool, optional Whether to also return noise-normalized difference. get_noise : bool, optional Whether to also return the difference image noise map. get_kernel : bool, optional Whether to also return the :class:`~stdpipe.sfft.SFFTResult` object containing kernel coefficients and fit metadata. verbose : bool or callable, optional Whether to show verbose messages. Returns ------- numpy.ndarray or list The difference image, or a list ``[diff, ...]`` if any ``get_*`` option is set. """ # Simple wrapper around print for logging in verbose mode only log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None # --- Masks --- if mask is None: mask = ~np.isfinite(image) else: mask = np.asarray(mask, dtype=bool) | ~np.isfinite(image) if template_mask is None: template_mask = ~np.isfinite(template) else: template_mask = np.asarray(template_mask, dtype=bool) | ~np.isfinite(template) combined_mask = mask | template_mask # --- Build noise models --- image_f = np.asarray(image, dtype=np.float64) template_f = np.asarray(template, dtype=np.float64) if err is True: log('Building noise model from the image') # SEP cannot handle NaN — replace with median for background estimation image_clean = image_f.copy() image_clean[~np.isfinite(image_clean)] = np.nanmedian(image_clean) err, _ = _build_err_map(image_clean, mask, gain=image_gain, verbose_log=log) elif err is not None: err = np.asarray(err, dtype=np.float64) if template_err is True: log('Building noise model from the template') template_clean = template_f.copy() template_clean[~np.isfinite(template_clean)] = np.nanmedian(template_clean) template_err, _ = _build_err_map( template_clean, template_mask, gain=template_gain, verbose_log=log, ) elif template_err is not None: template_err = np.asarray(template_err, dtype=np.float64) # Build combined weight map for SFFT solver # SFFT uses inverse-variance weighting internally when err is provided # We pass the science err to weight the fit sfft_err = err # may be None for uniform weighting # --- Build fitting mask from object positions --- fitting_mask = combined_mask if obj is not None and len(obj): obj = Table(obj) # Determine region half-size if obj_size is not None: size = int(obj_size) elif 'fwhm' in obj.colnames: size = int(np.ceil(3 * np.nanmedian(obj['fwhm']))) else: size = 15 # Build good-regions map good_regions = np.zeros(image.shape[:2], dtype=bool) ny_img, nx_img = image.shape[:2] n_used = 0 for row in obj: x0 = int(row['x']) y0 = int(row['y']) # Skip objects too close to edges if x0 < size or x0 >= nx_img - size or y0 < size or y0 >= ny_img - size: continue # Skip regions containing masked pixels region_mask = combined_mask[y0 - size : y0 + size, x0 - size : x0 + size] if np.any(region_mask): continue good_regions[y0 - size : y0 + size, x0 - size : x0 + size] = True n_used += 1 if n_used > 0: log('Using %d object regions (%d px half-size) for kernel fitting' % (n_used, size)) fitting_mask = combined_mask | ~good_regions else: log('No usable object regions, fitting on whole image') # --- Run SFFT --- log('Running SFFT subtraction') sfft_result = sfft_module.solve( image_f, template_f, mask=fitting_mask, err=sfft_err, kernel_shape=kernel_shape, kernel_poly_order=kernel_poly_order, bg_poly_order=bg_poly_order, flux_poly_order=flux_poly_order, flux_penalty=flux_penalty, ridge=ridge, sigma_clip=sigma_clip, max_iter=max_iter, verbose=verbose, ) diff = sfft_result.diff result = [diff] # --- Convolved template --- if get_convolved: # model = convolved_template + background # Extract background contribution to get convolved template alone bg_model = sfft_module.evaluate_background( sfft_result, *np.meshgrid(np.arange(diff.shape[1]), np.arange(diff.shape[0])), diff.shape, ) convolved = sfft_result.model - bg_model result.append(convolved) # --- Noise map for difference image --- noise = None if get_noise or get_scaled: noise = _compute_diff_noise(sfft_result, err, template_err, combined_mask, log) if get_noise: result.append(noise) # --- Noise-scaled difference --- if get_scaled: scaled = np.zeros_like(diff) good = (noise > 0) & np.isfinite(noise) & ~combined_mask scaled[good] = diff[good] / noise[good] result.append(scaled) # --- Kernel info --- if get_kernel: result.append(sfft_result) if len(result) == 1: return result[0] else: return result
def _compute_diff_noise(sfft_result, err, template_err, mask, log): """Compute per-pixel noise map for the SFFT difference image. diff = science - model, where model = Σ_α a_α(x,y) · R_α(x,y) + bg(x,y) var_diff(x,y) = var_sci(x,y) + Σ_α a_α²(x,y) · var_tmpl(x+dy, x+dx) If template_err is not available, assumes the template contributes no noise (deep template limit) and returns just the science noise. """ ny, nx = sfft_result.diff.shape # Science noise contribution if err is not None and hasattr(err, 'shape'): var_sci = err**2 else: # Estimate from difference image residuals rms = sfft_result.rms if np.isfinite(sfft_result.rms) else mad_std(sfft_result.diff[~mask]) var_sci = np.full((ny, nx), rms**2, dtype=np.float64) log('No science err provided, using constant RMS = %.2f for noise map' % rms) # Template noise contribution (convolved through the kernel) if template_err is not None and hasattr(template_err, 'shape'): log('Computing convolved template noise contribution') var_tmpl = template_err**2 # var_convolved = Σ_α a_α²(x,y) · var_tmpl_shifted(x,y) ky, kx = sfft_result.kernel_shape offsets = sfft_module._kernel_offsets(ky, kx) n_kpoly = sfft_module._n_poly(sfft_result.kernel_poly_order) x_norm, y_norm = sfft_module._norm_coords(ny, nx) poly_k = sfft_module._poly_terms_2d(x_norm, y_norm, sfft_result.kernel_poly_order) # Compute a_α(x,y) maps = kernel_coeffs[α] @ poly_k kernel_coeffs = sfft_result.kernel_coeffs # (n_kernel, n_kpoly) poly_k_flat = poly_k.reshape(n_kpoly, ny * nx) with np.errstate(over='ignore', invalid='ignore', divide='ignore'): a_maps = kernel_coeffs @ poly_k_flat # (n_kernel, npix) var_conv = np.zeros(ny * nx, dtype=np.float64) for alpha, (dy, dx) in enumerate(offsets): shifted_var = sfft_module._shift_image(var_tmpl, dy, dx).ravel() var_conv += a_maps[alpha] ** 2 * shifted_var var_conv = var_conv.reshape(ny, nx) var_total = var_sci + var_conv else: var_total = var_sci noise = np.sqrt(np.clip(var_total, 0, None)) return noise