Source code for stdpipe.pipeline

"""
Module containing higher-level pipeline building blocks, wrapping together lower-level
functionality of STDPipe modules.
"""

import os
import numpy as np
import itertools
from copy import deepcopy

from astropy.wcs import WCS
from astropy.io import fits
from astropy.coordinates import SkyCoord
from astropy.time import Time
from astropy.table import Table

import sep
import astroscrappy

from . import photometry
from . import astrometry
from . import catalogs
from . import psf
from . import utils
from . import cutouts


[docs] def make_mask( image, header=None, saturation=None, external_mask=None, mask_cosmics=False, gain=None, verbose=True, ): """Make a basic mask for an image. The mask is a boolean bitmap with ``True`` values marking regions to be excluded from further processing. The following regions are masked: - pixels with undefined or non-finite (inf or nan) values - regions outside the usable area defined by the ``DATASEC`` or ``TRIMSEC`` header keyword - pixels above the saturation limit, if provided - pixels set in the external mask, if provided - cosmic rays, if requested If ``saturation=True``, the saturation level is estimated from the image as ``median + 0.95 * (max - median)``. Parameters ---------- image : ndarray 2D image array to mask. header : astropy.io.fits.Header, optional FITS header; used to read ``DATASEC`` / ``TRIMSEC`` keywords. saturation : float or bool, optional Saturation level in ADU. If ``True``, estimated automatically from the image. external_mask : ndarray, optional Boolean mask to OR with the created mask. mask_cosmics : bool, optional If True, detect and mask cosmic rays using the LACosmic algorithm. gain : float, optional Detector gain in e-/ADU, used for cosmic-ray masking. verbose : bool or callable, optional Whether to show verbose messages. May be boolean or a ``print``-like callable. Returns ------- ndarray of bool Boolean mask with ``True`` marking excluded pixels. """ # Simple wrapper around print for logging in verbose mode only log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None mask = ~np.isfinite(image) if header is not None: # DATASEC or TRIMSEC for kw in ['TRIMSEC', 'DATASEC']: if kw in header: x1, x2, y1, y2 = utils.parse_det(header.get(kw)) log('Masking the region outside of %s = %s' % (kw, header.get(kw))) mask1 = np.ones_like(mask) mask1[y1 : y2 + 1, x1 : x2 + 1] = False mask |= mask1 if saturation is not None: if type(saturation) is bool and saturation == True: saturation = 0.05 * np.nanmedian(image) + 0.95 * np.nanmax(image) # med + 0.95(max-med) log('Masking pixels above saturation level %.1f ADU' % saturation) mask |= image >= saturation if external_mask is not None: external_mask = external_mask.astype(bool) log('Applying external mask with %d masked pixels' % np.sum(external_mask)) mask |= external_mask if mask_cosmics: # We will use custom noise model for astroscrappy as we do not know whether # the image is background-subtracted already, or how it was flatfielded bg = sep.Background(image, mask=mask) rms = bg.rms() var = rms**2 if gain: var += np.abs(image - bg.back()) / gain cmask, cimage = astroscrappy.detect_cosmics( image, mask, verbose=verbose, invar=var.astype(np.float32), gain=gain if gain else 1.0, satlevel=saturation if saturation else np.max(image), cleantype='medmask', ) log( 'Done masking cosmics, %d (%.1f%%) pixels masked' % (np.sum(cmask), 100 * np.sum(cmask) / cmask.shape[0] / cmask.shape[1]) ) mask |= cmask log( '%d (%.1f%%) pixels masked in total' % (np.sum(mask), 100 * np.sum(mask) / mask.shape[0] / mask.shape[1]) ) return mask
[docs] def refine_astrometry( obj, cat, sr=10 / 3600, wcs=None, order=0, cat_col_mag='V', cat_col_mag_err=None, cat_col_ra='RAJ2000', cat_col_dec='DEJ2000', cat_col_ra_err='e_RAJ2000', cat_col_dec_err='e_DEJ2000', n_iter=3, use_photometry=True, min_matches=5, method='quadhash', update=True, refine_residual_field=False, residual_field_kwargs=None, verbose=False, **kwargs, ): """Higher-level astrometric refinement routine. Dispatches to quad-hash, SCAMP, astropy, or astrometrynet fitting depending on ``method``. Parameters ---------- obj : astropy.table.Table List of objects on the frame, must contain at least ``x``, ``y``, and ``flux`` columns. cat : astropy.table.Table or str Reference astrometric catalogue. sr : float, optional Matching radius in degrees. wcs : astropy.wcs.WCS, optional Initial WCS solution. order : int, optional Polynomial order for SIP or PV distortion solution. Default 0 for TAN; 2 is recommended for ``quadhash``. cat_col_mag : str, optional Catalogue column name for magnitude. cat_col_mag_err : str, optional Catalogue column name for magnitude error. cat_col_ra : str, optional Catalogue column name for Right Ascension. cat_col_dec : str, optional Catalogue column name for Declination. cat_col_ra_err : str, optional Catalogue column name for Right Ascension error. cat_col_dec_err : str, optional Catalogue column name for Declination error. n_iter : int, optional Number of iterations for Python-based matching. use_photometry : bool, optional Use photometry-assisted matching in Python-based methods. min_matches : int, optional Minimum number of good matches required. method : str, optional Fitting method. One of ``'quadhash'`` (default, 2–7× more accurate), ``'scamp'``, ``'astropy'``, or ``'astrometrynet'``. update : bool, optional If True, update ``ra`` and ``dec`` columns in ``obj`` in-place. When combined with ``refine_residual_field=True``, ``x``/``y`` of ``obj`` are also corrected (their residual against the catalogue is subtracted) before ``ra``/``dec`` are recomputed from the refined positions. refine_residual_field : bool, optional If True, after the WCS fit, fit a smooth (dx, dy) correction field on the matched ``obj``--``cat`` residuals using :func:`stdpipe.astrometry.refine_positions_from_catalog` and (if ``update=True``) apply the inverse correction to ``obj['x']``, ``obj['y']`` in place. residual_field_kwargs : dict, optional Forwarded to :func:`refine_positions_from_catalog`. Common keys: ``backend``, ``image_shape``, ``grid_shape``, ``min_per_cell``, ``smooth_sigma``. verbose : bool or callable, optional Whether to show verbose messages. May be boolean or a ``print``-like callable. **kwargs All other keyword arguments are passed to the respective refinement function. Returns ------- astropy.wcs.WCS or None Refined astrometric solution, or None on failure. """ # Simple wrapper around print for logging in verbose mode only log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None log( 'Astrometric refinement using %.1f arcsec radius, %s matching and %s WCS fitting' % (sr * 3600, 'photometric' if use_photometry else 'simple positional', method) ) if type(cat) == str: log('Using %d objects and catalogue %s' % (len(obj), cat)) else: log('Using %d objects and %d catalogue stars' % (len(obj), len(cat))) if wcs is not None: obj['ra'], obj['dec'] = wcs.all_pix2world(obj['x'], obj['y'], 0) if method == 'quadhash': # Quad-hash pattern matching (default, hopefully more accurate than SCAMP) wcs = astrometry.refine_wcs_quadhash( obj, cat, wcs=wcs, sr=sr, order=order, cat_col_ra=cat_col_ra, cat_col_dec=cat_col_dec, cat_col_mag=cat_col_mag, update=update, verbose=verbose, **kwargs, ) elif method == 'scamp': # Fall-through to SCAMP-specific variant wcs = astrometry.refine_wcs_scamp( obj, cat, sr=sr, wcs=wcs, order=order, cat_col_mag=cat_col_mag, cat_col_mag_err=cat_col_mag_err, cat_col_ra=cat_col_ra, cat_col_dec=cat_col_dec, cat_col_ra_err=cat_col_ra_err, cat_col_dec_err=cat_col_dec_err, update=update, verbose=verbose, **kwargs, ) else: wcs = _refine_astrometry_iterative( obj, cat, sr=sr, wcs=wcs, order=order, cat_col_mag=cat_col_mag, cat_col_mag_err=cat_col_mag_err, cat_col_ra=cat_col_ra, cat_col_dec=cat_col_dec, n_iter=n_iter, use_photometry=use_photometry, min_matches=min_matches, method=method, update=update, log=log, verbose=verbose, **kwargs, ) if refine_residual_field and wcs is not None: _apply_residual_field_correction( obj, cat, wcs, sr=sr, cat_col_ra=cat_col_ra, cat_col_dec=cat_col_dec, update=update, residual_field_kwargs=residual_field_kwargs or {}, log=log, ) return wcs
def _apply_residual_field_correction( obj, cat, wcs, *, sr, cat_col_ra, cat_col_dec, update, residual_field_kwargs, log, ): """Re-match obj↔cat positionally, fit a smooth residual field, and optionally apply the inverse correction to ``obj['x']``, ``obj['y']`` in place. Helper for :func:`refine_astrometry`.""" obj_x = np.asarray(obj['x'], float) obj_y = np.asarray(obj['y'], float) obj_ra, obj_dec = wcs.all_pix2world(obj_x, obj_y, 0) cat_ra = np.asarray(cat[cat_col_ra], float) cat_dec = np.asarray(cat[cat_col_dec], float) # All obj↔cat pairs within sr (degrees). spherical_match returns every # pair, so dedupe to one nearest catalogue match per object. oidx_all, cidx_all, dist_all = astrometry.spherical_match( obj_ra, obj_dec, cat_ra, cat_dec, sr=sr, ) if len(oidx_all) < 50: log('Residual field skipped: only %d positional matches' % len(oidx_all)) return order = np.argsort(dist_all) oidx_sorted = oidx_all[order] cidx_sorted = cidx_all[order] _, first = np.unique(oidx_sorted, return_index=True) oidx_unique = oidx_sorted[first] cidx_unique = cidx_sorted[first] if len(oidx_unique) < 50: log('Residual field skipped: only %d unique positional matches' % len(oidx_unique)) return accepted = np.zeros(len(obj), dtype=bool) accepted[oidx_unique] = True cidx_aligned = np.full(len(obj), -1, dtype=int) cidx_aligned[oidx_unique] = cidx_unique match = { 'idx': accepted, 'oidx': np.arange(len(obj)), 'cidx': cidx_aligned, } correct, info = astrometry.refine_positions_from_catalog( match, obj, cat, wcs, cat_col_ra=cat_col_ra, cat_col_dec=cat_col_dec, **residual_field_kwargs, ) log( 'Residual field on %d matches: median |Δ| %.3f%.3f px, ' 'q90 %.3f%.3f px' % ( info['n_used'], info['raw_median_dr_pix'], info['corrected_median_dr_pix'], info['raw_q90_dr_pix'], info['corrected_q90_dr_pix'], ) ) if update: # Apply the inverse correction at observed positions: each # detection is shifted by -(dx, dy) so that the corrected x/y # lie on the WCS-consistent grid. The residual field was fit at # predicted positions, but the typical residual is ≪ FWHM so # evaluating at observed positions is fine. cdx, cdy = correct.smoother(obj_x, obj_y) obj['x'] = obj_x - cdx obj['y'] = obj_y - cdy obj['ra'], obj['dec'] = wcs.all_pix2world(obj['x'], obj['y'], 0) def _refine_astrometry_iterative( obj, cat, *, sr, wcs, order, cat_col_mag, cat_col_mag_err, cat_col_ra, cat_col_dec, n_iter, use_photometry, min_matches, method, update, log, verbose, **kwargs, ): """Iterative Python/astrometrynet refinement loop, factored out of :func:`refine_astrometry` so the dispatch path can be linear.""" for iter in range(n_iter): if use_photometry: # Matching involving photometric information cat_magerr = cat[cat_col_mag_err] if cat_col_mag_err is not None else None m = photometry.match( obj['ra'], obj['dec'], obj['mag'], obj['magerr'], obj['flags'], cat[cat_col_ra], cat[cat_col_dec], cat[cat_col_mag], cat_magerr=cat_magerr, sr=sr, verbose=verbose, ) if m is None or not m: log('Photometric match failed, cannot refine WCS') return None elif np.sum(m['idx']) < min_matches: log('Too few (%d) good photometric matches, cannot refine WCS' % np.sum(m['idx'])) return None else: log( 'Iteration %d: %d matches, %.1f arcsec rms' % (iter, np.sum(m['idx']), np.std(3600 * m['dist'][m['idx']])) ) wcs = astrometry.refine_wcs_simple( obj[m['oidx']][m['idx']], cat[m['cidx']][m['idx']], order=order, match=False, method=method, verbose=verbose, ) else: # Simple positional matching wcs = astrometry.refine_wcs_simple( obj, cat, order=order, sr=sr, match=True, method=method, verbose=verbose, **kwargs ) if update: obj['ra'], obj['dec'] = wcs.all_pix2world(obj['x'], obj['y'], 0) return wcs
[docs] def filter_transient_candidates( obj, sr=None, pixscale=None, fwhm=None, time=None, obj_col_ra='ra', obj_col_dec='dec', cat=None, cat_col_ra='RAJ2000', cat_col_dec='DEJ2000', vizier=[], skybot=True, ned=False, flagged=True, flagmask=0x7F00, col_id=None, vizier_checker_fn=None, get_candidates=True, remove=True, verbose=False, ): """Higher-level transient candidate filtering routine. Optionally filters out the following classes of objects: - flagged ones (``obj['flags'] != 0``) - positionally coincident with stars from a provided reference catalogue - positionally coincident with stars from Vizier catalogues - positionally and temporally coincident with Solar System objects from SkyBoT - positionally and temporally coincident with NED objects The original object list is never modified; a filtered or annotated copy is returned. If ``get_candidates=False``, returns only a boolean mask of objects surviving all filters. If ``get_candidates=True`` and ``remove=False``, all objects are returned but annotated with ``candidate_*`` columns indicating which filters matched each object, plus a ``candidate_good`` column that is ``True`` for objects surviving all filters. Parameters ---------- obj : astropy.table.Table Input object list. sr : float, optional Matching radius in degrees. Defaults to half FWHM (if ``pixscale`` is set) or 1 arcsec. pixscale : float, optional Pixel scale in degrees/pixel. Used to compute the default matching radius. fwhm : float, optional FWHM in pixels. Estimated from unflagged objects if not provided. time : astropy.time.Time or datetime.datetime, optional Observation time; required for SkyBoT cross-matching. obj_col_ra : str, optional Column name for object Right Ascension. obj_col_dec : str, optional Column name for object Declination. cat : astropy.table.Table, optional Reference catalogue for spatial cross-matching. cat_col_ra : str, optional Column name for catalogue Right Ascension. cat_col_dec : str, optional Column name for catalogue Declination. vizier : list of str, optional Vizier catalogue identifiers (or short names) to cross-match against. skybot : bool, optional If True, cross-match with SkyBoT Solar System object positions. ned : bool, optional If True, cross-match with NED database entries. flagged : bool, optional If True, filter out objects where ``(obj['flags'] & flagmask) != 0``. flagmask : int, optional Bitmask applied to ``obj['flags']`` for flag filtering. col_id : str, optional Column name for a unique object identifier. A ``stdpipe_id`` column is created automatically if not specified. vizier_checker_fn : callable, optional Function ``fn(obj, xcat, catname) -> bool array`` to apply additional conditions on Vizier cross-matches before they are considered true matches. get_candidates : bool, optional If True (default), return the filtered/annotated object list. If False, return only a boolean mask over the original list. remove : bool, optional If True (default), remove filtered entries from the returned list. If False, keep all objects and add ``candidate_*`` annotation columns. verbose : bool or callable, optional Whether to show verbose messages. May be boolean or a ``print``-like callable. Returns ------- astropy.table.Table or ndarray of bool Filtered/annotated copy of the object list, or (if ``get_candidates=False``) a boolean mask of the same length as the input. """ # Simple wrapper around print for logging in verbose mode only log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None if fwhm is None: idx = obj['flags'] == 0 idx &= obj['magerr'] < 0.05 # S/N > 20 fwhm = np.median(obj['fwhm'][idx]) if sr is None: if pixscale is not None: # Matching radius of half FWHM sr = np.median(fwhm * pixscale) / 2 else: # Fallback value of 1 arcsec, should be sensible for most catalogues sr = 1 / 3600 if col_id is None: col_id = 'stdpipe_id' if col_id not in obj.keys(): obj_in = obj obj = obj.copy() obj[col_id] = np.arange(len(obj)) else: obj_in = obj if remove == False: # We are asked to just mark the matched candidates, not remove from the result # So let's create a copy of objects list where we may freely add extra columns without touching the original obj_in = obj_in.copy() log( 'Candidate filtering routine started with %d initial candidates and %.1f arcsec matching radius' % (len(obj), sr * 3600) ) cand_idx = np.ones(len(obj), dtype=bool) # Object flags if flagged: # Filter out flagged objects (saturated, cosmics, blends, etc) idx = (obj['flags'] & flagmask) == 0 if remove == False: obj_in['candidate_flagged'] = ~idx cand_idx &= idx log(np.sum(cand_idx), 'of them are unflagged') # Reference catalogue if cat is not None and remove == False: obj_in['candidate_refcat'] = False if cat is not None and np.any(cand_idx): m = astrometry.spherical_match( obj[obj_col_ra], obj[obj_col_dec], cat[cat_col_ra], cat[cat_col_dec], sr ) cand_idx[m[0]] = False if remove == False: obj_in['candidate_refcat'] = False obj_in['candidate_refcat'][m[0]] = True log(np.sum(cand_idx), 'of them are not matched with reference catalogue') # Vizier catalogues for catname in vizier or []: if remove == False: if 'candidate_vizier_' + catname not in obj_in.keys(): obj_in['candidate_vizier_' + catname] = False if not np.any(cand_idx): break xcat = catalogs.xmatch_objects( obj[cand_idx][[col_id, obj_col_ra, obj_col_dec]], catname, sr, col_ra=obj_col_ra, col_dec=obj_col_dec, ) if xcat is not None and len(xcat): if callable(vizier_checker_fn): # Pass matched results through user-supplied checker xobj = obj[[np.where(obj[col_id] == _)[0][0] for _ in xcat[col_id]]] xidx = vizier_checker_fn(xobj, xcat, catname) xcat = xcat[xidx] cand_idx &= ~np.in1d(obj[col_id], xcat[col_id]) if remove == False: obj_in['candidate_vizier_' + catname][np.in1d(obj[col_id], xcat[col_id])] = True log( np.sum(cand_idx), 'remains after matching with', catalogs.catalogs.get(catname, {'name': catname})['name'], ) # SkyBoT if skybot and np.any(cand_idx): if time is None and 'time' in obj.keys(): time = obj['time'] if remove == False: if 'candidate_skybot' not in obj_in.keys(): obj_in['candidate_skybot'] = False if time is not None: xcat = catalogs.xmatch_skybot( obj[cand_idx], time=time, col_ra=obj_col_ra, col_dec=obj_col_dec, col_id=col_id ) if xcat is not None and len(xcat): cand_idx &= ~np.in1d(obj[col_id], xcat[col_id]) if remove == False: obj_in['candidate_skybot'][np.in1d(obj[col_id], xcat[col_id])] = True log(np.sum(cand_idx), 'remains after matching with SkyBot') # NED if ned and np.any(cand_idx): if remove == False: if 'candidate_ned' not in obj_in.keys(): obj_in['candidate_ned'] = False xcat = catalogs.xmatch_ned( obj[cand_idx], sr, col_id=col_id, col_ra=obj_col_ra, col_dec=obj_col_dec, ) if xcat is not None and len(xcat): cand_idx &= ~np.in1d(obj[col_id], xcat[col_id]) if remove == False: obj_in['candidate_ned'][np.in1d(obj[col_id], xcat[col_id])] = True log(np.sum(cand_idx), 'remains after matching with NED') if remove == False: obj_in['candidate_good'] = False obj_in['candidate_good'][cand_idx] = True log('%d candidates remaining after filtering' % len(obj[cand_idx])) if get_candidates: if remove: # Return filtered list return obj_in[cand_idx].copy() else: # Return full list with additional columns return obj_in else: # Return just indices return cand_idx
[docs] def calibrate_photometry( obj, cat, sr=None, pixscale=None, order=0, bg_order=None, obj_col_mag='mag', obj_col_mag_err='magerr', obj_col_ra='ra', obj_col_dec='dec', obj_col_flags='flags', obj_col_x='x', obj_col_y='y', cat_col_mag='R', cat_col_mag_err=None, cat_col_mag1=None, cat_col_mag2=None, cat_col_ra='RAJ2000', cat_col_dec='DEJ2000', update=True, verbose=False, **kwargs, ): """Higher-level photometric calibration routine. Wraps :func:`stdpipe.photometry.match` with convenient defaults for typical tabular data. Parameters ---------- obj : astropy.table.Table Table of detected objects. cat : astropy.table.Table Reference photometric catalogue. sr : float, optional Matching radius in degrees. Defaults to half the median FWHM in sky units if ``pixscale`` is provided, otherwise 1 arcsec. pixscale : float, optional Pixel scale in degrees/pixel; used to compute the default matching radius. order : int, optional Spatial polynomial order for the zero-point model (0 = constant). bg_order : int or None, optional Spatial polynomial order for an additive flux background term. None disables this term. obj_col_mag : str, optional Column name for object instrumental magnitude. obj_col_mag_err : str, optional Column name for object magnitude error. obj_col_ra : str, optional Column name for object Right Ascension. obj_col_dec : str, optional Column name for object Declination. obj_col_flags : str, optional Column name for object flags. obj_col_x : str, optional Column name for object x coordinate. obj_col_y : str, optional Column name for object y coordinate. cat_col_mag : str, optional Column name for catalogue magnitude. cat_col_mag_err : str, optional Column name for catalogue magnitude error. cat_col_mag1 : str, optional First catalogue magnitude column for the color term (e.g. ``'B'``). cat_col_mag2 : str, optional Second catalogue magnitude column for the color term (e.g. ``'R'``). cat_col_ra : str, optional Column name for catalogue Right Ascension. cat_col_dec : str, optional Column name for catalogue Declination. update : bool, optional If True, add ``mag_calib`` and ``mag_calib_err`` columns to ``obj`` in-place with the calibrated magnitude and its error. verbose : bool or callable, optional Whether to show verbose messages. May be boolean or a ``print``-like callable. **kwargs Additional keyword arguments passed to :func:`stdpipe.photometry.match`. Returns ------- dict or None Dictionary with photometric results as returned by :func:`stdpipe.photometry.match`, or None on failure. """ # Simple wrapper around print for logging in verbose mode only log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None if sr is None: if pixscale is not None: # Matching radius of half FWHM sr = np.median(obj['fwhm'] * pixscale) / 2 else: # Fallback value of 1 arcsec, should be sensible for most catalogues sr = 1 / 3600 log( 'Performing photometric calibration of %d objects vs %d catalogue stars' % (len(obj), len(cat)) ) log( 'Using %.1f arcsec matching radius, %s magnitude and spatial order %d' % (sr * 3600, cat_col_mag, order) ) if cat_col_mag1 and cat_col_mag2: log('Using (%s - %s) color for color term' % (cat_col_mag1, cat_col_mag2)) if cat_col_mag1 and cat_col_mag2: color = cat[cat_col_mag1] - cat[cat_col_mag2] else: color = None if cat_col_mag_err: cat_magerr = cat[cat_col_mag_err] else: cat_magerr = None m = photometry.match( obj[obj_col_ra], obj[obj_col_dec], obj[obj_col_mag], obj[obj_col_mag_err], obj[obj_col_flags], cat[cat_col_ra], cat[cat_col_dec], cat[cat_col_mag], cat_magerr=cat_magerr, sr=sr, cat_color=color, obj_x=obj[obj_col_x] if obj_col_x else None, obj_y=obj[obj_col_y] if obj_col_y else None, spatial_order=order, bg_order=bg_order, verbose=verbose, **kwargs, ) if m: log('Photometric calibration finished successfully.') # if m['color_term']: # log('Color term is %.2f' % m['color_term']) m['cat_col_mag'] = cat_col_mag if cat_col_mag1 and cat_col_mag2: m['cat_col_mag1'] = cat_col_mag1 m['cat_col_mag2'] = cat_col_mag2 if update: obj['mag_calib'] = obj[obj_col_mag] + m['zero_fn']( obj[obj_col_x], obj[obj_col_y], obj[obj_col_mag] ) obj['mag_calib_err'] = np.hypot( obj[obj_col_mag_err], m['zero_fn'](obj[obj_col_x], obj[obj_col_y], obj[obj_col_mag], get_err=True), ) else: log('Photometric calibration failed') return m
[docs] def make_random_stars( width=None, height=None, shape=None, nstars=100, minflux=1, maxflux=100000, edge=0, wcs=None, verbose=False, ): """Generate a table of random stars. Coordinates are distributed uniformly with ``edge <= x < width - edge`` and ``edge <= y < height - edge``. Fluxes are log-uniform between ``minflux`` and ``maxflux``. Parameters ---------- width : int, optional Width of the image in pixels. height : int, optional Height of the image in pixels. shape : tuple of int, optional Image shape ``(height, width)``; used instead of ``width`` and ``height`` if set. nstars : int, optional Number of artificial stars to generate. minflux : float, optional Minimum star flux in ADU. maxflux : float, optional Maximum star flux in ADU. edge : int, optional Minimum distance to image edges for star placement. wcs : astropy.wcs.WCS, optional If provided, sky coordinates ``ra`` and ``dec`` are added to the catalogue. verbose : bool or callable, optional Whether to show verbose messages. May be boolean or a ``print``-like callable. Returns ------- astropy.table.Table Catalogue with columns ``x``, ``y``, ``flux``, ``mag``, and optionally ``ra``, ``dec`` if ``wcs`` is provided. """ # Simple wrapper around print for logging in verbose mode only log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None if (width is None or height is None) and shape is not None: height, width = shape cat = { 'x': np.random.uniform(edge, width - 1 - edge, nstars), 'y': np.random.uniform(edge, height - edge, nstars), 'flux': 10 ** np.random.uniform(np.log10(minflux), np.log10(maxflux), nstars), } cat = Table(cat) if wcs is not None and wcs.celestial: cat['ra'], cat['dec'] = wcs.all_pix2world(cat['x'], cat['y'], 0) else: cat['ra'], cat['dec'] = np.nan, np.nan cat['mag'] = -2.5 * np.log10(cat['flux']) return cat
[docs] def place_random_stars( image, psf_model, nstars=100, minflux=1, maxflux=100000, gain=None, saturation=65535, edge=0, wcs=None, verbose=False, ): """Randomly place artificial stars into an image in-place. Stars are added on top of the existing image content. Poissonian noise is applied according to ``gain``, and pixel values are clipped at ``saturation``. Coordinates are uniformly distributed; fluxes are log-uniform. Parameters ---------- image : ndarray 2D image array; modified in-place. psf_model : dict PSF model structure as returned by :func:`stdpipe.psf.run_psfex`. nstars : int, optional Number of artificial stars to inject. minflux : float, optional Minimum star flux in ADU. maxflux : float, optional Maximum star flux in ADU. gain : float, optional Detector gain in e-/ADU; used to add Poissonian noise to each injected star. saturation : float, optional Saturation level in ADU; injected pixels above this are clipped. edge : int, optional Minimum distance to image edges for star placement. wcs : astropy.wcs.WCS, optional If provided, sky coordinates ``ra`` and ``dec`` are added to the catalogue. verbose : bool or callable, optional Whether to show verbose messages. May be boolean or a ``print``-like callable. Returns ------- astropy.table.Table Catalogue of injected stars as returned by :func:`make_random_stars`, with columns ``x``, ``y``, ``flux``, ``mag``, and optionally ``ra``, ``dec``. """ # Simple wrapper around print for logging in verbose mode only log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None cat = make_random_stars( shape=image.shape, nstars=nstars, minflux=minflux, maxflux=maxflux, edge=edge, wcs=wcs, verbose=verbose, ) for _ in cat: psf.place_psf_stamp(image, psf_model, _['x'], _['y'], flux=_['flux'], gain=gain) if saturation is not None: image[image > saturation] = saturation return cat
[docs] def split_sub_fn(x1, y1, dx1, dy1, *args, get_origin=False, **kwargs): """ """ result = [] if get_origin: result += [x1, y1] # Let's find WCS as we may need it later for converting sky coordinates wcs = None for arg in args + tuple(kwargs.values()): if isinstance(arg, WCS): wcs = arg break def handle_arg_fn(arg): if isinstance(arg, np.ndarray): # Image return cutouts.crop_image(arg, x1, y1, dx1, dy1) if isinstance(arg, fits.Header): # FITS header subheader = arg.copy() subheader['NAXIS1'] = dx1 subheader['NAXIS2'] = dy1 # Adjust the WCS keywords if present if 'CRPIX1' in subheader and 'CRPIX2' in subheader: subheader['CRPIX1'] -= x1 subheader['CRPIX2'] -= y1 # Crop position inside original frame subheader['CROP_X1'] = x1 subheader['CROP_X2'] = x1 + dx1 subheader['CROP_Y1'] = y1 subheader['CROP_Y2'] = y1 + dy1 return subheader if isinstance(arg, WCS): # WCS wcs1 = arg.deepcopy() # FIXME: is there any more 'official' way of shifting the WCS? wcs1.wcs.crpix[0] -= x1 wcs1.wcs.crpix[1] -= y1 wcs1 = WCS(wcs1.to_header(relax=True)) return wcs1 if isinstance(arg, Table): # Table table = arg.copy() x, y = None, None if 'x' in table.colnames and 'y' in table.colnames: x, y = table['x'].copy(), table['y'].copy() table['x'] -= x1 table['y'] -= y1 elif 'ra' in table.colnames and 'dec' in table.colnames and wcs is not None: x, y = wcs.all_world2pix(table['ra'], table['dec'], 0) elif 'RAJ2000' in table.colnames and 'DEJ2000' in table.colnames and wcs is not None: x, y = wcs.all_world2pix(table['RAJ2000'], table['DEJ2000'], 0) if x is not None: idx = (x > x1) & (x < x1 + dx1) idx &= (y > y1) & (y < y1 + dy1) else: idx = np.ones(len(table), dtype=bool) return table[idx] if isinstance(arg, dict) and 'x0' in arg and 'y0' in arg: # PSF structure, but also maybe some other dict-based objects res = deepcopy(arg) res['x0'] -= x1 res['y0'] -= y1 return res return None for arg in args + tuple(kwargs.values()): result.append(handle_arg_fn(arg)) return result
[docs] def split_image( image, *args, nx=1, ny=None, overlap=0, xmin=None, xmax=None, ymin=None, ymax=None, get_index=False, get_origin=False, verbose=False, **kwargs, ): """Generator that splits an image into ``nx × ny`` sub-image blocks. Also optionally yields sub-setted copies of any additional images, FITS headers, WCS solutions, PSF models, catalogues, or object lists passed as positional or keyword arguments. FITS headers and WCS solutions are adjusted to reflect the sub-image astrometry. Tables are filtered to rows whose ``x``/``y``, ``ra``/``dec``, or ``RAJ2000``/``DEJ2000`` coordinates fall inside the sub-image. Sub-images may optionally overlap by ``overlap`` pixels in every direction, so that each part of the original image appears far from an edge in at least one block. Parameters ---------- image : ndarray 2D image to split. *args Additional images, headers, WCS objects, or tables to crop alongside ``image``. nx : int, optional Number of sub-images along the x axis. ny : int, optional Number of sub-images along the y axis. Defaults to ``nx``. overlap : int, optional Number of pixels by which adjacent blocks overlap. xmin, xmax : int, optional Horizontal extent of the region to split (defaults to full image width). ymin, ymax : int, optional Vertical extent of the region to split (defaults to full image height). get_index : bool, optional If True, prepend the sub-image index (0-based) to each yielded list. get_origin : bool, optional If True, include the sub-image origin ``(x1, y1)`` in each yielded list. verbose : bool or callable, optional Whether to show verbose messages. May be boolean or a ``print``-like callable. **kwargs Additional images, headers, WCS objects, or tables to crop. Yields ------ list Each element is a list built from (in order): - sub-image index, if ``get_index=True`` - origin ``x1``, ``y1``, if ``get_origin=True`` - cropped image - cropped versions of each ``*args`` and ``**kwargs`` entry """ # Simple wrapper around print for logging in verbose mode only log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None if not ny: ny = nx # Sub-region to split if xmin is None: xmin = 0 else: xmin = int(max(0, xmin)) if xmax is None: xmax = image.shape[1] else: xmax = int(min(image.shape[1], xmax)) if ymin is None: ymin = 0 else: ymin = int(max(0, ymin)) if ymax is None: ymax = image.shape[0] else: ymax = int(min(image.shape[0], ymax)) width = xmax - xmin height = ymax - ymin dx, dy = int(np.floor(width / nx)), int(np.floor(height / ny)) log( 'Will split the image (%dx%d) into %dx%d pieces with %dx%d pixels size and %d pix overlap' % (width, height, nx, ny, dx, dy, overlap) ) for i, (x0, y0) in enumerate( itertools.product(range(xmin, xmax - dx + 1, dx), range(ymin, ymax - dy + 1, dy)) ): # Make some overlap x1 = max(0, x0 - overlap) y1 = max(0, y0 - overlap) dx1 = min(x0 - x1 + dx + overlap, xmax - x1) dy1 = min(y0 - y1 + dy + overlap, ymax - y1) result = split_sub_fn(x1, y1, dx1, dy1, image, *args, get_origin=get_origin, **kwargs) if get_index: result = [i] + result log('Block %d: %d %d - %d %d' % (i, x1, y1, x1 + dx1, y1 + dy1)) yield result if len(result) > 1 else result[0]
[docs] def get_subimage_centered( image, *args, x0=None, y0=None, width=None, height=None, get_origin=False, verbose=False, **kwargs, ): """Crop a sub-image centered at a given pixel position. Also optionally crops any additional images, masks, headers, WCS solutions, PSF models, object lists, etc. passed as arguments. Behaviour is identical to :func:`split_image` for a single block. Unlike :func:`stdpipe.utils.crop_image_centered`, this function accepts explicit output ``width`` and ``height``. If the requested region extends beyond the image boundary, the returned sub-image will be smaller (no padding is applied). Parameters ---------- image : ndarray 2D image to crop. *args Additional images, headers, WCS objects, or tables to crop alongside ``image``. x0 : int or float Pixel x coordinate of the desired center in the original image. y0 : int or float Pixel y coordinate of the desired center in the original image. width : int Pixel width of the sub-image. height : int, optional Pixel height of the sub-image. Defaults to ``width``. get_origin : bool, optional If True, include the sub-image origin ``(x1, y1)`` in the returned list. verbose : bool or callable, optional Whether to show verbose messages. May be boolean or a ``print``-like callable. **kwargs Additional images, headers, WCS objects, or tables to crop. Returns ------- list List containing: - origin ``x1``, ``y1``, if ``get_origin=True`` - cropped image - cropped versions of each ``*args`` and ``**kwargs`` entry """ # Simple wrapper around print for logging in verbose mode only log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None if not height: height = width x1 = int(np.floor(x0 - 0.5 * width)) y1 = int(np.floor(y0 - 0.5 * height)) x2 = x1 + width - 1 y2 = y1 + height - 1 x1, x2 = max(0, x1), min(x2, image.shape[1] - 1) y1, y2 = max(0, y1), min(y2, image.shape[0] - 1) log( 'Will crop the %d %d - %d %d sub-image from original (%dx%d) image centered at %d %d' % (x1, y1, x2, y2, image.shape[1], image.shape[0], x0, y0) ) result = split_sub_fn( x1, y1, x2 - x1 + 1, y2 - y1 + 1, image, *args, get_origin=get_origin, **kwargs ) return result
[docs] def get_detection_limit(obj, sn=5, method='sn', verbose=True): """Estimate the detection limit magnitude. The object table must contain calibrated magnitudes and errors in ``mag_calib`` and ``mag_calib_err`` columns. Parameters ---------- obj : astropy.table.Table Table with calibrated objects. sn : float, optional S/N threshold defining the detection limit. method : str, optional Estimation method. Currently only ``'sn'`` (S/N vs magnitude extrapolation) is implemented. verbose : bool or callable, optional Whether to show verbose messages. May be boolean or a ``print``-like callable. Returns ------- float or None Magnitude corresponding to the detection limit at the given S/N level, or None if estimation fails. """ # Simple wrapper around print for logging in verbose mode only log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None if method == 'sn': log('Estimating detection limit using S/N vs magnitude method') mag0 = photometry.get_detection_limit_sn( obj['mag_calib'], 1 / obj['magerr'], sn=sn, verbose=verbose ) elif method == 'bg': log('Estimating detection limit using background noise method') raise RuntimeError('Not implemented') if mag0 is not None: log('Detection limit at S/N=%g level is %.2f' % (sn, mag0)) else: log('Error estimating the detection limit!') return mag0