Source code for stdpipe.astrometry

import os, tempfile, shutil, shlex, re, warnings
import numpy as np

from astropy.wcs import WCS
from astropy.io import fits
from astropy.wcs.utils import fit_wcs_from_points
from astropy.coordinates import SkyCoord, search_around_sky
from astropy.table import Table
from astropy import units as u


from scipy.spatial import KDTree

from . import utils

# Put these to common namespace
from .astrometry_quad import refine_wcs_quadhash


[docs] def get_frame_center(filename=None, header=None, wcs=None, width=None, height=None, shape=None): """Returns image center RA, Dec, and angular radius in degrees. Parameters ---------- filename : str, optional Path to FITS file. header : astropy.io.fits.Header, optional FITS header containing WCS keywords. wcs : astropy.wcs.WCS, optional WCS object. width : int, optional Image width in pixels. Read from header if not provided. height : int, optional Image height in pixels. Read from header if not provided. shape : tuple of int, optional Image shape ``(height, width)``. Used if `width`/`height` not given. Returns ------- ra : float or None Right Ascension of frame center in degrees. dec : float or None Declination of frame center in degrees. sr : float or None Angular radius from center to corner in degrees. """ if not wcs: if header: wcs = WCS(header) elif filename: header = fits.getheader(filename, -1) wcs = WCS(header) if width is None or height is None: if header is not None: width = header['NAXIS1'] height = header['NAXIS2'] elif shape is not None: height, width = shape if not wcs or not wcs.is_celestial: return None, None, None ra1, dec1 = wcs.all_pix2world(0, 0, 0) ra0, dec0 = wcs.all_pix2world(width / 2, height / 2, 0) sr = spherical_distance(ra0, dec0, ra1, dec1) return ra0.item(), dec0.item(), sr.item()
[docs] def get_pixscale(wcs=None, filename=None, header=None): """Returns pixel scale of an image in degrees per pixel. Parameters ---------- wcs : astropy.wcs.WCS, optional WCS object. filename : str, optional Path to FITS file. header : astropy.io.fits.Header, optional FITS header containing WCS keywords. Returns ------- float Pixel scale in degrees per pixel. """ if not wcs: if header: wcs = WCS(header=header) elif filename: header = fits.getheader(filename, -1) wcs = WCS(header=header) return np.hypot(wcs.pixel_scale_matrix[0, 0], wcs.pixel_scale_matrix[0, 1])
[docs] def radectoxyz(ra, dec): ra_rad = np.deg2rad(ra) dec_rad = np.deg2rad(dec) xyz = np.array( ( np.cos(dec_rad) * np.cos(ra_rad), np.cos(dec_rad) * np.sin(ra_rad), np.sin(dec_rad), ) ) return xyz
[docs] def xyztoradec(xyz): ra = np.arctan2(xyz[1], xyz[0]) ra += 2 * np.pi * (ra < 0) dec = np.arcsin(xyz[2] / np.linalg.norm(xyz, axis=0)) return (np.rad2deg(ra), np.rad2deg(dec))
[docs] def spherical_distance(ra1, dec1, ra2, dec2): """Compute spherical distance between two points or sets of points. Parameters ---------- ra1 : float or array_like First point or set of points RA in degrees. dec1 : float or array_like First point or set of points Dec in degrees. ra2 : float or array_like Second point or set of points RA in degrees. dec2 : float or array_like Second point or set of points Dec in degrees. Returns ------- float or ndarray Spherical distance in degrees. """ x = np.sin(np.deg2rad((ra1 - ra2) / 2)) x *= x y = np.sin(np.deg2rad((dec1 - dec2) / 2)) y *= y z = np.cos(np.deg2rad((dec1 + dec2) / 2)) z *= z return np.rad2deg(2 * np.arcsin(np.sqrt(x * (z - y) + y)))
[docs] def spherical_match(ra1, dec1, ra2, dec2, sr=1 / 3600): """Positional match on the sphere for two lists of coordinates. Aimed to be a direct replacement for :func:`esutil.htm.HTM.match` method with :code:`maxmatch=0`. Parameters ---------- ra1 : array_like First set of points RA in degrees. dec1 : array_like First set of points Dec in degrees. ra2 : array_like Second set of points RA in degrees. dec2 : array_like Second set of points Dec in degrees. sr : float, optional Maximal acceptable pair distance to be considered a match, in degrees. Default is 1/3600 (1 arcsecond). Returns ------- idx1 : ndarray of int Indices into the first list for matched pairs. idx2 : ndarray of int Indices into the second list for matched pairs. dist : ndarray of float Pairwise distances in degrees. """ # Ensure that inputs are arrays, and drop units if any ra1 = np.array(np.atleast_1d(ra1)) dec1 = np.array(np.atleast_1d(dec1)) ra2 = np.array(np.atleast_1d(ra2)) dec2 = np.array(np.atleast_1d(dec2)) idx1, idx2, dist, _ = search_around_sky( SkyCoord(ra1, dec1, unit='deg'), SkyCoord(ra2, dec2, unit='deg'), sr * u.deg ) dist = dist.deg # convert to degrees return idx1, idx2, dist
[docs] def planar_match(x1, y1, x2, y2, sr=1): """Positional match on the plane for two lists of coordinates. Parameters ---------- x1 : array_like First set of points X coordinates. y1 : array_like First set of points Y coordinates. x2 : array_like Second set of points X coordinates. y2 : array_like Second set of points Y coordinates. sr : float, optional Maximal acceptable pair distance to be considered a match. Default is 1. Returns ------- idx1 : ndarray of int Indices into the first list for matched pairs. idx2 : ndarray of int Indices into the second list for matched pairs. dist : ndarray of float Pairwise distances. """ kd = KDTree(np.array([x2, y2]).T) idx1, idx2 = [], [] for i, js in enumerate(kd.query_ball_point(np.array([x1, y1]).T, sr)): for j in js: idx1.append(i) idx2.append(j) idx1 = np.array(idx1) idx2 = np.array(idx2) dist = np.hypot(x1[idx1] - x2[idx2], y1[idx1] - y2[idx2]) return idx1, idx2, dist
[docs] def get_objects_center(obj, col_ra='ra', col_dec='dec'): """ Returns the center RA, Dec, and radius in degrees for a cloud of objects on the sky. """ xyz = radectoxyz(obj[col_ra], obj[col_dec]) xyz0 = np.mean(xyz, axis=1) ra0, dec0 = xyztoradec(xyz0) sr0 = np.max(spherical_distance(ra0, dec0, obj[col_ra], obj[col_dec])) return ra0, dec0, sr0
[docs] def blind_match_objects( obj, order=2, update=False, sn=20, get_header=False, width=None, height=None, center_ra=None, center_dec=None, radius=None, scale_lower=None, scale_upper=None, scale_units='arcsecperpix', config=None, extra={}, _workdir=None, _tmpdir=None, _exe=None, verbose=False, ): """Thin wrapper for blind plate solving using local Astrometry.Net and a list of detected objects. It requires `solve-field` binary from Astrometry.Net and some index files to be locally available. Parameters ---------- obj : astropy.table.Table List of objects on the frame, must contain at least ``x``, ``y``, and ``flux`` columns. order : int, optional Order for the SIP spatial distortion polynomial. update : bool, optional If True, the object list will be updated in-place with correct ``ra`` and ``dec`` sky coordinates. sn : float, optional If provided, only objects with S/N exceeding this value will be used for matching. get_header : bool, optional If True, return the FITS header object instead of WCS solution. width : int, optional Image width in pixels, used for guessing the frame center. height : int, optional Image height in pixels, used for guessing the frame center. center_ra : float, optional Approximate center RA of the field in degrees. center_dec : float, optional Approximate center Dec of the field in degrees. radius : float, optional Search radius in degrees around the specified center. scale_lower : float, optional Lower limit for the pixel scale. scale_upper : float, optional Upper limit for the pixel scale. scale_units : str, optional Units for ``scale_lower``/``scale_upper``. One of ``arcsecperpix``, ``arcminwidth``, or ``degwidth``. config : str, optional Path to config file for ``solve-field``. extra : dict, optional Additional parameters to pass to ``solve-field`` binary. _workdir : str, optional If specified, all temporary files will be kept in this directory after the run. _tmpdir : str, optional If specified, temporary files will be created inside this path. _exe : str, optional Full path to ``solve-field`` executable. Auto-detected from :envvar:`PATH` if not provided. verbose : bool or callable, optional Whether to show verbose messages. May be boolean or a ``print``-like callable. Returns ------- astropy.wcs.WCS or astropy.io.fits.Header or None Astrometric solution, or FITS header if ``get_header=True``, or None on failure. """ # Simple wrapper around print for logging in verbose mode only log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None # Find the binary binname = None if _exe is not None: # Check user-provided binary path, and fail if not found if os.path.isfile(_exe): binname = _exe else: # Find it in standard paths - does it even go there on any system?.. binname = shutil.which('solve-field') if binname is None: for path in [ '.', '/usr/bin', '/usr/local/bin', '/opt/local/bin', '/usr/astrometry/bin', '/usr/local/astrometry/bin', '/opt/local/astrometry/bin', ]: for exe in ['solve-field']: if os.path.isfile(os.path.join(path, exe)): binname = os.path.join(path, exe) break if binname is None: log("Can't find Astrometry.Net binary") return None # else: # log("Using Astrometry.Net binary at", binname) # Sort objects according to decreasing flux aidx = np.argsort(-obj['flux']) # Filter out least-significant detections, if SN limit is specified if sn is not None and sn > 0: aidx = [_ for _ in aidx if obj['flux'][_] / obj['fluxerr'][_] > sn] if width is None: width = int(np.max(obj['x'])) if height is None: height = int(np.max(obj['y'])) workdir = ( _workdir if _workdir is not None else tempfile.mkdtemp(prefix='astrometry', dir=_tmpdir) ) columns = [ fits.Column(name='XIMAGE', format='1D', array=obj['x'][aidx] + 1), fits.Column(name='YIMAGE', format='1D', array=obj['y'][aidx] + 1), fits.Column(name='FLUX', format='1D', array=obj['flux'][aidx]), ] tbhdu = fits.BinTableHDU.from_columns(columns) objname = os.path.join(workdir, 'list.fits') tbhdu.writeto(objname, overwrite=True) tmpname = os.path.join(workdir, 'list.tmp') wcsname = os.path.join(workdir, 'list.wcs') opts = { 'x-column': 'XIMAGE', 'y-column': 'YIMAGE', 'sort-column': 'FLUX', 'width': width, 'height': height, 'crpix-center': True, # 'overwrite': True, 'no-plots': True, 'dir': workdir, 'verbose': True if verbose else False, } if config is not None: opts['config'] = config if order is not None and order > 0: opts['tweak-order'] = order else: opts['no-tweak'] = True if scale_lower is not None: opts['scale-low'] = scale_lower if scale_upper is not None: opts['scale-high'] = scale_upper if scale_units is not None: opts['scale-units'] = scale_units if center_ra is not None: opts['ra'] = center_ra if center_dec is not None: opts['dec'] = center_dec if radius is not None: opts['radius'] = radius opts.update(extra) wcs = None for iter in range(2): if os.path.exists(wcsname): shutil.move(wcsname, tmpname) opts['verify'] = tmpname # Build the command line command = binname + ' ' + shlex.quote(objname) + ' ' + utils.format_long_opts(opts) if not verbose: command += ' > /dev/null 2>/dev/null' log('Will run iteration %d of Astrometry.Net like that:' % (iter,)) log(command) res = os.system(command) if res == 0 and os.path.exists(wcsname): log('Successfully run iteration %d' % (iter,)) else: log('Error %s running Astrometry.Net' % res) if res == 0 and os.path.exists(wcsname): header = fits.getheader(wcsname) wcs = WCS(header) ra0, dec0, sr0 = get_frame_center(wcs=wcs, width=width, height=height) pixscale = get_pixscale(wcs=wcs) log( 'Got WCS solution with center at %.4f %.4f radius %.2f deg and pixel scale %.2f arcsec/pix' % (ra0, dec0, sr0, pixscale * 3600) ) if update and wcs: obj['ra'], obj['dec'] = wcs.all_pix2world(obj['x'], obj['y'], 0) if _workdir is None: shutil.rmtree(workdir) return wcs
[docs] def blind_match_astrometrynet( obj, order=2, update=False, sn=20, get_header=False, width=None, height=None, solve_timeout=600, api_key=None, center_ra=None, center_dec=None, radius=None, scale_lower=None, scale_upper=None, scale_units='arcsecperpix', **kwargs, ): """Thin wrapper for remote plate solving using Astrometry.Net and a list of detected objects. Most parameters are passed directly to ``astroquery.astrometrynet.AstrometryNet.solve_from_source_list``. API key may be provided as an argument or set in ``~/.astropy/config/astroquery.cfg``. Parameters ---------- obj : astropy.table.Table List of objects on the frame, must contain at least ``x``, ``y``, and ``flux`` columns. order : int, optional Order for the SIP spatial distortion polynomial. update : bool, optional If True, the object list will be updated in-place with correct ``ra`` and ``dec`` sky coordinates. sn : float, optional If provided, only objects with S/N exceeding this value will be used for matching. get_header : bool, optional If True, return the FITS header object instead of WCS solution. width : int, optional Image width in pixels, used for guessing the frame center. height : int, optional Image height in pixels, used for guessing the frame center. solve_timeout : float, optional Timeout in seconds to wait for the solution from the remote server. api_key : str, optional Astrometry.Net API key. If not set, must be configured in ``~/.astropy/config/astroquery.cfg``. center_ra : float, optional Approximate center RA of the field in degrees. center_dec : float, optional Approximate center Dec of the field in degrees. radius : float, optional Search radius in degrees around the specified center. scale_lower : float, optional Lower limit for the pixel scale. scale_upper : float, optional Upper limit for the pixel scale. scale_units : str, optional Units for ``scale_lower``/``scale_upper``. One of ``arcsecperpix``, ``arcminwidth``, or ``degwidth``. Returns ------- astropy.wcs.WCS or astropy.io.fits.Header or None Astrometric solution, or FITS header if ``get_header=True``, or None on failure. """ # Import required module - we postpone it until now to hide warnings for API key from astroquery.astrometry_net import AstrometryNet # Sort objects according to decreasing flux aidx = np.argsort(-obj['flux']) # Filter out least-significant detections, if SN limit is specified if sn is not None and sn > 0: aidx = [_ for _ in aidx if obj['flux'][_] / obj['fluxerr'][_] > sn] if width is None: width = int(np.max(obj['x'])) if height is None: height = int(np.max(obj['y'])) an = AstrometryNet() if api_key is not None: an.api_key = api_key try: header = an.solve_from_source_list( obj['x'][aidx] + 1, obj['y'][aidx] + 1, width, height, center_ra=center_ra, center_dec=center_dec, radius=radius, scale_lower=scale_lower, scale_upper=scale_upper, scale_units=scale_units, solve_timeout=solve_timeout, tweak_order=order, **kwargs, ) except: import traceback traceback.print_exc() header = None if header is not None: wcs = WCS(header) if update: obj['ra'], obj['dec'] = wcs.all_pix2world(obj['x'], obj['y'], 0) if get_header: return header else: return wcs return None
[docs] def refine_wcs_simple( obj, cat, order=2, match=True, sr=3 / 3600, update=False, cat_col_ra='RAJ2000', cat_col_dec='DEJ2000', method='astropy', _tmpdir=None, verbose=False, ): """Refine the WCS using detected objects and a reference catalogue. Parameters ---------- obj : astropy.table.Table List of detected objects with at least ``x``, ``y``, ``ra``, ``dec`` columns. cat : astropy.table.Table Reference astrometric catalogue. order : int, optional SIP polynomial order for distortion solution. match : bool, optional If True, perform nearest-neighbor matching between objects and catalogue. If False, assume they are already matched line by line. sr : float, optional Matching radius in degrees. Default is 3/3600 (3 arcseconds). update : bool, optional If True, update object sky coordinates in-place using the new WCS. cat_col_ra : str, optional Catalogue column name for Right Ascension. cat_col_dec : str, optional Catalogue column name for Declination. method : str, optional Fitting method. Either ``'astropy'`` or ``'astrometrynet'``. _tmpdir : str, optional If specified, temporary files will be created inside this path. verbose : bool or callable, optional Whether to show verbose messages. May be boolean or a ``print``-like callable. Returns ------- astropy.wcs.WCS or None Refined WCS solution, or None on failure. """ # Simple wrapper around print for logging in verbose mode only log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None if match: # Perform simple nearest-neighbor matching within given radius oidx, cidx, dist = spherical_match( obj['ra'], obj['dec'], cat[cat_col_ra], cat[cat_col_dec], sr ) _obj = obj[oidx] _cat = cat[cidx] else: # Assume supplied objects and catalogue are already matched line by line _obj = obj _cat = cat wcs = None if method == 'astropy': wcs = fit_wcs_from_points( [_obj['x'], _obj['y']], SkyCoord(_cat[cat_col_ra], _cat[cat_col_dec], unit='deg'), sip_degree=order, ) elif method == 'astrometrynet': binname = None # Rough estimate of frame dimensions width = np.max(obj['x']) height = np.max(obj['y']) for path in ['.', '/usr/local', '/opt/local']: if os.path.isfile(os.path.join(path, 'astrometry', 'bin', 'fit-wcs')): binname = os.path.join(path, 'astrometry', 'bin', 'fit-wcs') break if binname: dirname = tempfile.mkdtemp(prefix='astrometry', dir=_tmpdir) columns = [ fits.Column(name='FIELD_X', format='1D', array=obj['x'] + 1), fits.Column(name='FIELD_Y', format='1D', array=obj['y'] + 1), fits.Column(name='INDEX_RA', format='1D', array=cat[cat_col_ra]), fits.Column(name='INDEX_DEC', format='1D', array=cat[cat_col_dec]), ] tbhdu = fits.BinTableHDU.from_columns(columns) filename = os.path.join(dirname, 'list.fits') wcsname = os.path.join(dirname, 'list.wcs') tbhdu.writeto(filename, overwrite=True) command = "%s -c %s -o %s -W %d -H %d -C -s %d" % ( binname, filename, wcsname, width, height, order, ) log('Running Astrometry.Net WCS fitter like that:') log(command) os.system(command) if os.path.isfile(wcsname): header = fits.getheader(wcsname) wcs = WCS(header) log('WCS fitter run succeeded') else: log('WCS fitter run failed') shutil.rmtree(dirname) else: log("Astrometry.Net fit-wcs binary not found") if wcs: if update: log('Updating object sky coordinates in-place') # Update the sky coordinates of objects using new wcs obj['ra'], obj['dec'] = wcs.all_pix2world(obj['x'], obj['y'], 0) else: log('WCS refinement failed') return wcs
[docs] def clear_wcs( header, remove_comments=False, remove_history=False, remove_underscored=False, copy=False, ): """Clear WCS-related keywords from a FITS header. Parameters ---------- header : astropy.io.fits.Header Header to operate on. remove_comments : bool, optional If True, also remove COMMENT keywords. remove_history : bool, optional If True, also remove HISTORY keywords. remove_underscored : bool, optional If True, also remove all keywords starting with underscore (often added by Astrometry.Net). copy : bool, optional If True, operate on a copy and leave the original unchanged. Returns ------- astropy.io.fits.Header Modified FITS header. """ if copy: header = header.copy() wcs_keywords = [ 'WCSAXES', 'CRPIX1', 'CRPIX2', 'PC1_1', 'PC1_2', 'PC2_1', 'PC2_2', 'CDELT1', 'CDELT2', 'CUNIT1', 'CUNIT2', 'CTYPE1', 'CTYPE2', 'CRVAL1', 'CRVAL2', 'LONPOLE', 'LATPOLE', 'RADESYS', 'EQUINOX', 'B_ORDER', 'A_ORDER', 'BP_ORDER', 'AP_ORDER', 'CD1_1', 'CD2_1', 'CD1_2', 'CD2_2', 'IMAGEW', 'IMAGEH', ] scamp_keywords = [ 'FGROUPNO', 'ASTIRMS1', 'ASTIRMS2', 'ASTRRMS1', 'ASTRRMS2', 'ASTINST', 'FLXSCALE', 'MAGZEROP', 'PHOTIRMS', 'PHOTINST', 'PHOTLINK', ] remove = [] for key in header.keys(): if key: is_delete = False if key in wcs_keywords: is_delete = True if key in scamp_keywords: is_delete = True if re.match('^(A|B|AP|BP)_\d+_\d+$', key): # SIP is_delete = True if re.match('^PV_?\d+_\d+$', key): # PV is_delete = True if key[0] == '_' and remove_underscored: is_delete = True if key == 'COMMENT' and remove_comments: is_delete = True if key == 'HISTORY' and remove_history: is_delete = True if is_delete: remove.append(key) for key in remove: header.remove(key, remove_all=True, ignore_missing=True) return header
[docs] def wcs_pv2sip(header, order=None, accuracy=1e-4): """Convert a WCS header from any projection to TAN-SIP representation. The original WCS is sampled on a dense pixel grid and SIP polynomial coefficients (A, B for the forward distortion and AP, BP for the reverse) are fitted to reproduce the same pixel→sky mapping via least squares. Works for any input projection (TPV, ZPN, ZEA, etc.). Parameters ---------- header : `~astropy.io.fits.Header` Input FITS header with WCS keywords. order : int or None SIP polynomial order. If ``None`` (default), the order is chosen automatically (2 through 6) to achieve *accuracy* on the fitting grid. accuracy : float Target accuracy in arcseconds for automatic order selection (default 1e-4, i.e. 0.1 mas). Returns ------- header : `~astropy.io.fits.Header` New header with TAN-SIP WCS (CTYPE ``RA---TAN-SIP``). """ header = header.copy() # Parse the original WCS before modifying the header wcs_orig = WCS(header) nx = int(header.get('NAXIS1', 1024)) ny = int(header.get('NAXIS2', 1024)) # Ensure CD matrix is present (convert PC+CDELT → CD) if 'CD1_1' not in header and 'PC1_1' in header: cdelt = [header.get('CDELT1'), header.get('CDELT2')] header['CD1_1'] = header.pop('PC1_1') * cdelt[0] header['CD2_1'] = header.pop('PC2_1') * cdelt[0] header['CD1_2'] = header.pop('PC1_2') * cdelt[0] header['CD2_2'] = header.pop('PC2_2') * cdelt[0] crpix1 = header['CRPIX1'] crpix2 = header['CRPIX2'] crval1 = header['CRVAL1'] crval2 = header['CRVAL2'] cd = np.array( [ [header['CD1_1'], header.get('CD1_2', 0)], [header.get('CD2_1', 0), header['CD2_2']], ] ) cd_inv = np.linalg.inv(cd) # Dense pixel grid ngrid = int(max(50, min(200, max(nx, ny) // 10))) gx = np.linspace(0, nx - 1, ngrid) gy = np.linspace(0, ny - 1, ngrid) xx, yy = np.meshgrid(gx, gy) xf, yf = xx.ravel(), yy.ravel() # Sky coordinates from the original WCS ra, dec = wcs_orig.all_pix2world(xf, yf, 0) good = np.isfinite(ra) & np.isfinite(dec) xf, yf, ra, dec = xf[good], yf[good], ra[good], dec[good] # TAN inverse projection: (RA, Dec) → (xi, eta) in degrees ra0 = np.radians(crval1) dec0 = np.radians(crval2) ra_r = np.radians(ra) dec_r = np.radians(dec) cos_dec = np.cos(dec_r) sin_dec = np.sin(dec_r) cos_dec0 = np.cos(dec0) sin_dec0 = np.sin(dec0) dra = ra_r - ra0 denom = sin_dec * sin_dec0 + cos_dec * cos_dec0 * np.cos(dra) xi = np.degrees(cos_dec * np.sin(dra) / denom) eta = np.degrees((sin_dec * cos_dec0 - cos_dec * sin_dec0 * np.cos(dra)) / denom) # Pixel offsets from CRPIX (0-based) u = xf - (crpix1 - 1) v = yf - (crpix2 - 1) # Target distorted pixel coords: CD_inv · (xi, eta) u_target = cd_inv[0, 0] * xi + cd_inv[0, 1] * eta v_target = cd_inv[1, 0] * xi + cd_inv[1, 1] * eta # SIP corrections: A(u, v) = u_target - u, B(u, v) = v_target - v du = u_target - u dv = v_target - v def _sip_terms(sip_order): """Return list of (p, q) exponent pairs for SIP order.""" pq = [] for total in range(2, sip_order + 1): for q in range(total + 1): pq.append((total - q, q)) return pq def _sip_basis(u, v, pq, scale): """Build normalized SIP polynomial basis matrix. Coordinates are divided by *scale* for numerical stability; coefficients are rescaled to raw-pixel convention afterwards. """ un, vn = u / scale, v / scale return np.column_stack([un**p * vn**q for p, q in pq]) def _fit_order(u, v, du, dv, sip_order, scale): """Fit SIP coefficients at a given order, return residual.""" pq = _sip_terms(sip_order) A_mat = _sip_basis(u, v, pq, scale) ca_n, _, _, _ = np.linalg.lstsq(A_mat, du, rcond=None) cb_n, _, _, _ = np.linalg.lstsq(A_mat, dv, rcond=None) # Evaluate residual in normalized space (numerically stable) res_u = du - A_mat @ ca_n res_v = dv - A_mat @ cb_n pix_scale = np.sqrt(np.abs(np.linalg.det(cd))) * 3600 max_res = max(np.max(np.abs(res_u)), np.max(np.abs(res_v))) max_res *= pix_scale # arcsec # Rescale to raw-pixel convention: coeff_raw = coeff_norm / scale^(p+q) ca = ca_n.copy() cb = cb_n.copy() for i, (p, q) in enumerate(pq): s = scale ** (p + q) ca[i] /= s cb[i] /= s return ca, cb, pq, max_res # Normalization scale for numerical stability scale = max(np.max(np.abs(u)), np.max(np.abs(v)), 1.0) if order is not None: ca, cb, pq, max_res = _fit_order(u, v, du, dv, order, scale) else: # Auto-select order: try increasing orders, pick the best one. # Cap at 6 — higher orders tend to become numerically unstable # when the distortion is inherently non-polynomial (e.g. ZPN). best_res = np.inf best_result = None best_order = 2 for try_order in range(2, 7): ca_t, cb_t, pq_t, max_res_t = _fit_order(u, v, du, dv, try_order, scale) if max_res_t < best_res: best_res = max_res_t best_result = (ca_t, cb_t, pq_t, max_res_t) best_order = try_order if max_res_t < accuracy: break ca, cb, pq, max_res = best_result order = best_order # Strip old distortion keywords for key in list(header.keys()): if key.startswith(('A_', 'B_', 'AP_', 'BP_', 'PV')): del header[key] # Write forward SIP coefficients header['CTYPE1'] = 'RA---TAN-SIP' header['CTYPE2'] = 'DEC--TAN-SIP' header['A_ORDER'] = order header['B_ORDER'] = order for i, (p, q) in enumerate(pq): if abs(ca[i]) > 1e-20: header['A_%d_%d' % (p, q)] = float(ca[i]) if abs(cb[i]) > 1e-20: header['B_%d_%d' % (p, q)] = float(cb[i]) # Fit reverse SIP (AP, BP): map distorted coords back to undistorted du_rev = u - u_target dv_rev = v - v_target pq_rev = _sip_terms(order) A_rev = _sip_basis(u_target, v_target, pq_rev, scale) cap, _, _, _ = np.linalg.lstsq(A_rev, du_rev, rcond=None) cbp, _, _, _ = np.linalg.lstsq(A_rev, dv_rev, rcond=None) for i, (p, q) in enumerate(pq_rev): s = scale ** (p + q) cap[i] /= s cbp[i] /= s header['AP_ORDER'] = order header['BP_ORDER'] = order for i, (p, q) in enumerate(pq_rev): if abs(cap[i]) > 1e-20: header['AP_%d_%d' % (p, q)] = float(cap[i]) if abs(cbp[i]) > 1e-20: header['BP_%d_%d' % (p, q)] = float(cbp[i]) return header
def _fit_tpv_coefficients(wcs_orig, header, order=5): """Numerically fit TPV polynomial coefficients for a non-TAN WCS. Samples the original WCS on a dense pixel grid, projects the sky coordinates through a TAN gnomonic projection, and fits TPV polynomial coefficients to reproduce the mapping. Parameters ---------- wcs_orig : `~astropy.wcs.WCS` Original WCS (any projection, with or without SIP). header : `~astropy.io.fits.Header` Header with CRVAL/CRPIX/CD (will be modified in-place to add PV keywords and set CTYPE to TPV). order : int Maximum polynomial order for the TPV fit (default 5). Returns ------- header : modified header with PV keywords and CTYPE set to TPV. """ nx = int(header.get('NAXIS1', 1024)) ny = int(header.get('NAXIS2', 1024)) # Dense pixel grid (avoid edges by half-pixel margin) ngrid = int(max(50, min(200, max(nx, ny) // 10))) gx = np.linspace(0, nx - 1, ngrid) gy = np.linspace(0, ny - 1, ngrid) xx, yy = np.meshgrid(gx, gy) xf, yf = xx.ravel(), yy.ravel() # Sky coordinates from the original WCS ra, dec = wcs_orig.all_pix2world(xf, yf, 0) # Filter out NaN / non-finite (can happen near edges) good = np.isfinite(ra) & np.isfinite(dec) xf, yf, ra, dec = xf[good], yf[good], ra[good], dec[good] # CRVAL and CD matrix from header crval1 = header['CRVAL1'] crval2 = header['CRVAL2'] crpix1 = header['CRPIX1'] crpix2 = header['CRPIX2'] # Build CD matrix if 'CD1_1' in header: cd = np.array( [ [header['CD1_1'], header.get('CD1_2', 0)], [header.get('CD2_1', 0), header['CD2_2']], ] ) else: pc = np.array( [ [header.get('PC1_1', 1), header.get('PC1_2', 0)], [header.get('PC2_1', 0), header.get('PC2_2', 1)], ] ) cdelt = np.array([header.get('CDELT1', 1), header.get('CDELT2', 1)]) cd = pc * cdelt[:, None] # Intermediate pixel coords: (u, v) = CD · (x - crpix, y - crpix) dx = xf - (crpix1 - 1) # 0-based pixel coords → 1-based offset dy = yf - (crpix2 - 1) u = cd[0, 0] * dx + cd[0, 1] * dy v = cd[1, 0] * dx + cd[1, 1] * dy # TAN gnomonic projection: (RA, Dec) → (xi, eta) in degrees ra0 = np.radians(crval1) dec0 = np.radians(crval2) ra_r = np.radians(ra) dec_r = np.radians(dec) cos_dec = np.cos(dec_r) sin_dec = np.sin(dec_r) cos_dec0 = np.cos(dec0) sin_dec0 = np.sin(dec0) dra = ra_r - ra0 denom = sin_dec * sin_dec0 + cos_dec * cos_dec0 * np.cos(dra) xi = np.degrees(cos_dec * np.sin(dra) / denom) eta = np.degrees((sin_dec * cos_dec0 - cos_dec * sin_dec0 * np.cos(dra)) / denom) # Build TPV polynomial design matrix # TPV basis for axis 1: terms in (u, v), axis 2: terms in (v, u) # Following the TPV convention, excluding radial terms at # indices 3, 11, 23 which are degenerate with polynomial terms def _tpv_basis(p, q, max_order): """Build TPV polynomial basis. p is the "primary" variable (u for axis 1, v for axis 2), q is the "secondary" variable. """ terms = [] indices = [] # Mapping from (total_order, sub_index) to PV index # Order 0: PV_0 = 1 # Order 1: PV_1 = p, PV_2 = q # Order 2: PV_4 = p², PV_5 = pq, PV_6 = q² # Order 3: PV_7 = p³, PV_8 = p²q, PV_9 = pq², PV_10 = q³ # (PV_3=r and PV_11=r³ are radial, skipped) # Order 4: PV_12..PV_16 # Order 5: PV_17..PV_22 (PV_23=r⁵ skipped) # Order 6: PV_24..PV_30 # Order 7: PV_31..PV_38 (PV_39=r⁷ skipped) pv_idx = 0 for total in range(max_order + 1): for j in range(total + 1): i = total - j # p^i * q^j terms.append(p**i * q**j) indices.append(pv_idx) pv_idx += 1 # Skip radial term after orders 1, 3, 5, 7 if total in (1, 3, 5, 7): pv_idx += 1 # skip PV_3, PV_11, PV_23, PV_39 return np.column_stack(terms), indices A1, idx1 = _tpv_basis(u, v, order) A2, idx2 = _tpv_basis(v, u, order) # Fit via least squares: xi = A1 @ c1, eta = A2 @ c2 c1, _, _, _ = np.linalg.lstsq(A1, xi, rcond=None) c2, _, _, _ = np.linalg.lstsq(A2, eta, rcond=None) # Write PV keywords for i, pv_idx in enumerate(idx1): if abs(c1[i]) > 1e-16: header['PV1_%d' % pv_idx] = float(c1[i]) for i, pv_idx in enumerate(idx2): if abs(c2[i]) > 1e-16: header['PV2_%d' % pv_idx] = float(c2[i]) header['CTYPE1'] = 'RA---TPV' header['CTYPE2'] = 'DEC--TPV' return header
[docs] def wcs_sip2pv(header): """Convert a WCS header from SIP (or any projection) to TPV representation. The original WCS is sampled on a dense pixel grid and TPV polynomial coefficients are fitted to reproduce the same pixel→sky mapping via least squares. Works for any projection type (TAN-SIP, ZPN-SIP, etc.) with sub-milliarcsecond accuracy. Parameters ---------- header : astropy.io.fits.Header Input FITS header with WCS keywords. Returns ------- astropy.io.fits.Header New header with TPV WCS (``CTYPE`` set to ``RA---TPV``). """ header = header.copy() # Parse the original WCS before modifying the header wcs_orig = WCS(header) # Ensure CD matrix is present (convert PC+CDELT → CD) if 'CD1_1' not in header and 'PC1_1' in header: cdelt = [header.get('CDELT1'), header.get('CDELT2')] header['CD1_1'] = header.pop('PC1_1') * cdelt[0] header['CD2_1'] = header.pop('PC2_1') * cdelt[0] header['CD1_2'] = header.pop('PC1_2') * cdelt[0] header['CD2_2'] = header.pop('PC2_2') * cdelt[0] # Strip SIP and old PV keywords before fitting new ones for key in list(header.keys()): if key.startswith(('A_', 'B_', 'AP_', 'BP_', 'PV')): del header[key] _fit_tpv_coefficients(wcs_orig, header) return header
[docs] def table_to_ldac(table, header=None, writeto=None): primary_hdu = fits.PrimaryHDU() header_str = header.tostring(endcard=True) # FIXME: this is a quick and dirty hack to preserve final 'END ' in the string # as astropy.io.fits tends to strip trailing whitespaces from string data, and it breaks at least SCAMP header_str += fits.Header().tostring(endcard=True) header_col = fits.Column( name='Field Header Card', format='%dA' % len(header_str), array=[header_str] ) header_hdu = fits.BinTableHDU.from_columns(fits.ColDefs([header_col])) header_hdu.header['EXTNAME'] = 'LDAC_IMHEAD' data_hdu = fits.table_to_hdu(table) data_hdu.header['EXTNAME'] = 'LDAC_OBJECTS' hdulist = fits.HDUList([primary_hdu, header_hdu, data_hdu]) if writeto is not None: hdulist.writeto(writeto, overwrite=True) return hdulist
[docs] def refine_wcs_scamp( obj, cat=None, wcs=None, header=None, sr=2 / 3600, order=3, cat_col_ra='RAJ2000', cat_col_dec='DEJ2000', cat_col_ra_err='e_RAJ2000', cat_col_dec_err='e_DEJ2000', cat_col_mag='rmag', cat_col_mag_err='e_rmag', cat_mag_lim=99, sn=None, position_maxerr=None, posangle_maxerr=None, pixscale_maxerr=None, match_flipped=False, extra={}, get_header=False, update=False, _workdir=None, _tmpdir=None, _exe=None, verbose=False, ): """Wrapper for running SCAMP to get a refined astrometric solution. Parameters ---------- obj : astropy.table.Table List of objects on the frame, must contain at least ``x``, ``y``, and ``flux`` columns. cat : astropy.table.Table or str, optional Reference astrometric catalogue, or a SCAMP network catalogue name (e.g. ``'GAIA-DR2'``). wcs : astropy.wcs.WCS, optional Initial WCS solution. header : astropy.io.fits.Header, optional FITS header containing the initial astrometric solution. sr : float, optional Matching radius in degrees. Controls SCAMP's ``CROSSID_RADIUS``. order : int, optional Polynomial order for PV distortion solution (1 or greater). cat_col_ra : str, optional Catalogue column name for Right Ascension. cat_col_dec : str, optional Catalogue column name for Declination. cat_col_ra_err : str, optional Catalogue column name for Right Ascension error. cat_col_dec_err : str, optional Catalogue column name for Declination error. cat_col_mag : str, optional Catalogue column name for magnitude. cat_col_mag_err : str, optional Catalogue column name for magnitude error. cat_mag_lim : float or sequence of float, optional Magnitude limit(s) for catalogue stars. A single value is used as an upper limit; two values ``[min, max]`` define a range. sn : float or sequence of float, optional S/N threshold(s) for SCAMP (passed as ``SN_THRESHOLDS``). position_maxerr : float, optional Maximum positional uncertainty of the initial WCS in arcminutes. Controls the search range for field center offset during pattern matching. Default is SCAMP's built-in default of 1 arcmin. posangle_maxerr : float, optional Maximum position angle uncertainty in degrees. Default is SCAMP's built-in default of 5 degrees. pixscale_maxerr : float, optional Maximum pixel scale uncertainty as a multiplicative factor (>1.0). E.g. 1.2 means ±20% around the initial scale. Default is SCAMP's built-in default of 1.2. match_flipped : bool, optional If True, also try matching with flipped (mirrored) axes. Default is False. extra : dict, optional Additional parameters to pass to the SCAMP binary. get_header : bool, optional If True, return the FITS header object instead of WCS solution. update : bool, optional If True, update object sky coordinates in-place using the new WCS. _workdir : str, optional If specified, all temporary files will be kept in this directory after the run. _tmpdir : str, optional If specified, temporary files will be created inside this path. _exe : str, optional Full path to the SCAMP executable. Auto-detected from :envvar:`PATH` if not provided. verbose : bool or callable, optional Whether to show verbose messages. May be boolean or a ``print``-like callable. Returns ------- astropy.wcs.WCS or astropy.io.fits.Header or None Refined astrometric solution, or FITS header if ``get_header=True``, or None on failure. """ # Simple wrapper around print for logging in verbose mode only log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None # Find the binary binname = None if _exe is not None: # Check user-provided binary path, and fail if not found if os.path.isfile(_exe): binname = _exe else: # Find SExtractor binary in common paths for exe in ['scamp']: binname = shutil.which(exe) if binname is not None: break if binname is None: log("Can't find SCAMP binary") return None # else: # log("Using SCAMP binary at", binname) workdir = _workdir if _workdir is not None else tempfile.mkdtemp(prefix='scamp', dir=_tmpdir) if header is None: # Construct minimal FITS header covering our data points header = fits.Header( { 'NAXIS': 2, 'NAXIS1': np.max(obj['x'] + 1), 'NAXIS2': np.max(obj['y'] + 1), 'BITPIX': -64, 'EQUINOX': 2000.0, } ) else: header = header.copy() if wcs is not None and wcs.is_celestial: # Add WCS information to the header header += wcs.to_header(relax=True) # Ensure the header is in TPV convention, as SCAMP does not support SIP if wcs.sip is not None: log("Converting the header from SIP to TPV convention") header = wcs_sip2pv(header) else: log("Can't operate without initial WCS") return None # Dummy config filename, to prevent loading from current dir confname = os.path.join(workdir, 'empty.conf') utils.file_write(confname) xmlname = os.path.join(workdir, 'scamp.xml') opts = { 'c': confname, 'VERBOSE_TYPE': 'QUIET', 'SOLVE_PHOTOM': 'N', 'CHECKPLOT_TYPE': 'NONE', 'WRITE_XML': 'Y', 'XML_NAME': xmlname, 'PROJECTION_TYPE': 'TPV', 'CROSSID_RADIUS': sr * 3600, 'DISTORT_DEGREES': max(1, order), 'STABILITY_TYPE': 'EXPOSURE', 'SN_THRESHOLDS': [3.0, 30.0], } if sn is not None: if np.isscalar(sn): opts['SN_THRESHOLDS'] = [sn, 10 * sn] else: opts['SN_THRESHOLDS'] = [sn[0], sn[1]] # Pattern matching search range parameters if position_maxerr is not None: opts['POSITION_MAXERR'] = position_maxerr if posangle_maxerr is not None: opts['POSANGLE_MAXERR'] = posangle_maxerr if pixscale_maxerr is not None: opts['PIXSCALE_MAXERR'] = pixscale_maxerr if match_flipped: opts['MATCH_FLIPPED'] = 'Y' opts.update(extra) # Minimal LDAC table with objects t_obj = Table( data={ 'XWIN_IMAGE': obj['x'] + 1, # SCAMP uses 1-based coordinates 'YWIN_IMAGE': obj['y'] + 1, 'ERRAWIN_IMAGE': obj['xerr'], 'ERRBWIN_IMAGE': obj['yerr'], 'FLUX_AUTO': obj['flux'], 'FLUXERR_AUTO': obj['fluxerr'], 'MAG_AUTO': obj['mag'], 'MAGERR_AUTO': obj['magerr'], 'FLAGS': obj['flags'], } ) objname = os.path.join(workdir, 'objects.cat') table_to_ldac(t_obj, header, objname) hdrname = os.path.join(workdir, 'objects.head') opts['HEADER_NAME'] = hdrname if os.path.exists(hdrname): os.unlink(hdrname) if cat: if type(cat) == str: # Match with network catalogue by name opts['ASTREF_CATALOG'] = cat log('Using', cat, 'as a network catalogue') else: # Match with user-provided catalogue t_cat = Table( data={ 'X_WORLD': cat[cat_col_ra], 'Y_WORLD': cat[cat_col_dec], 'ERRA_WORLD': utils.table_get(cat, cat_col_ra_err, 1 / 3600), 'ERRB_WORLD': utils.table_get(cat, cat_col_dec_err, 1 / 3600), 'MAG': utils.table_get(cat, cat_col_mag, 0), 'MAGERR': utils.table_get(cat, cat_col_mag_err, 0.01), 'OBSDATE': np.ones_like(cat[cat_col_ra]) * 2000.0, 'FLAGS': np.zeros_like(cat[cat_col_ra], dtype=int), } ) # Remove masked values for _ in t_cat.colnames: if np.ma.is_masked(t_cat[_]): t_cat = t_cat[~t_cat[_].mask] # Convert units of err columns to degrees, if any for _ in ['ERRA_WORLD', 'ERRB_WORLD']: if t_cat[_].unit and t_cat[_].unit != 'deg': t_cat[_] = t_cat[_].to('deg') # Limit the catalogue to given magnitude range if cat_mag_lim is not None: if hasattr(cat_mag_lim, '__len__') and len(cat_mag_lim) == 2: # Two elements provided, treat them as lower and upper limits t_cat = t_cat[ (t_cat['MAG'] >= cat_mag_lim[0]) & (t_cat['MAG'] <= cat_mag_lim[1]) ] else: # One element provided, treat it as upper limit t_cat = t_cat[t_cat['MAG'] <= cat_mag_lim] catname = os.path.join(workdir, 'catalogue.cat') table_to_ldac(t_cat, header, catname) opts['ASTREF_CATALOG'] = 'FILE' opts['ASTREFCAT_NAME'] = catname log('Using user-provided local catalogue') else: log('Using default settings for network catalogue') # Build the command line command = binname + ' ' + shlex.quote(objname) + ' ' + utils.format_astromatic_opts(opts) if not verbose: command += ' > /dev/null 2>/dev/null' log('Will run SCAMP like that:') log(command) # Run the command! res = os.system(command) wcs = None if res == 0 and os.path.exists(hdrname) and os.path.exists(xmlname): log('SCAMP run successfully') diag = Table.read(xmlname, table_id=0)[0] # Log detailed diagnostics from SCAMP XML n_matches = int(diag['NDeg_Reference']) reduced_chi2 = float(diag['Chi2_Reference']) log('Reference matches: %d, reduced chi2: %.2f' % (n_matches, reduced_chi2)) # Log matching diagnostics if available (pattern matching was performed) for key in ['AS_Contrast', 'XY_Contrast', 'DPixelScale', 'DPosAngle', 'Shear']: if key in diag.colnames: log(' %s: %s' % (key, diag[key])) # Log astrometric residuals if available for key in ['AstromOffset_Reference', 'AstromSigma_Reference']: if key in diag.colnames: val = diag[key] if hasattr(val, '__len__'): log(' %s: %s arcsec' % (key, ' '.join('%.3f' % v for v in val))) else: log(' %s: %.3f arcsec' % (key, val)) # Validate the solution quality # Chi2_Reference from SCAMP is already a reduced chi-squared # (sum of chi2 divided by naxis * n_matches), so we check it directly if n_matches < 3: log('Too few matches (%d), fitting likely failed' % n_matches) elif reduced_chi2 > 100: log('Reduced chi2 too large (%.1f), fitting likely failed' % reduced_chi2) else: with open(hdrname, 'r') as f: h1 = fits.Header.fromstring(f.read().encode('ascii', 'ignore'), sep='\n') # Sometimes SCAMP returns TAN type solution even despite PV keywords present if h1['CTYPE1'] != 'RA---TPV' and 'PV1_0' in h1.keys(): log( 'Got WCS solution with CTYPE1 =', h1['CTYPE1'], ' and PV keywords, fixing it', ) h1['CTYPE1'] = 'RA---TPV' h1['CTYPE2'] = 'DEC--TPV' # .. while sometimes it does the opposite elif h1['CTYPE1'] == 'RA---TPV' and 'PV1_0' not in h1.keys(): log( 'Got WCS solution with CTYPE1 =', h1['CTYPE1'], ' and without PV keywords, fixing it', ) h1['CTYPE1'] = 'RA---TAN' h1['CTYPE2'] = 'DEC--TAN' h1 = WCS(h1).to_header(relax=True) if get_header: # FIXME: should we really return raw / unfixed header here?.. log('Returning raw header instead of WCS solution') wcs = h1 else: wcs = WCS(h1) if update: obj['ra'], obj['dec'] = wcs.all_pix2world(obj['x'], obj['y'], 0) log( 'Astrometric accuracy: %.2f" %.2f"' % (h1.get('ASTRRMS1', 0) * 3600, h1.get('ASTRRMS2', 0) * 3600) ) else: log('Error', res, 'running SCAMP') wcs = None if _workdir is None: shutil.rmtree(workdir) return wcs
[docs] def store_wcs(filename, wcs, overwrite=True): """Store WCS information in an (empty) FITS file. Parameters ---------- filename : str Path to the output FITS file. wcs : astropy.wcs.WCS WCS object to store. overwrite : bool, optional If True, overwrite an existing file. Default is True. """ dirname = os.path.split(filename)[0] try: # For Python3, we may simply use exists_ok=True to avoid wrapping it inside try-cache os.makedirs(dirname) except: pass hdu = fits.PrimaryHDU(header=wcs.to_header(relax=True)) hdu.writeto(filename, overwrite=overwrite)
[docs] def upscale_wcs(wcs, scale=2, will_rebin=False): """Return a WCS corresponding to a frame upscaled by a given factor. Parameters ---------- wcs : astropy.wcs.WCS Input WCS to upscale. scale : float, optional Upscaling factor (not necessarily integer). Default is 2. will_rebin : bool, optional If True, adjust CRPIX so that rebinning the result back to original resolution with :func:`utils.rebin_image` will not introduce a shift. Returns ------- astropy.wcs.WCS WCS corresponding to the upscaled frame. """ with warnings.catch_warnings(): warnings.simplefilter('ignore') whdr = wcs.to_header(relax=True) for _ in ['CRPIX1', 'CRPIX2']: whdr[_] = (whdr[_] - 1) * scale + 1 for _ in ['PC1_1', 'PC1_2', 'PC2_1', 'PC2_2']: whdr[_] /= scale if 'SIP' in whdr['CTYPE1']: # SIP-type distortions for i in range(0, whdr.get('A_ORDER', 0) + 1): for j in range(0, whdr.get('A_ORDER', 0) + 1): if 'A_%d_%d' % (i, j) in whdr: whdr['A_%d_%d' % (i, j)] /= scale ** (i + j - 1) for i in range(0, whdr.get('B_ORDER', 0) + 1): for j in range(0, whdr.get('B_ORDER', 0) + 1): if 'B_%d_%d' % (i, j) in whdr: whdr['B_%d_%d' % (i, j)] /= scale ** (i + j - 1) for i in range(0, whdr.get('AP_ORDER', 0) + 1): for j in range(0, whdr.get('AP_ORDER', 0) + 1): if 'AP_%d_%d' % (i, j) in whdr: whdr['AP_%d_%d' % (i, j)] /= scale ** (i + j - 1) for i in range(0, whdr.get('BP_ORDER', 0) + 1): for j in range(0, whdr.get('BP_ORDER', 0) + 1): if 'BP_%d_%d' % (i, j) in whdr: whdr['BP_%d_%d' % (i, j)] /= scale ** (i + j - 1) new = WCS(whdr) if will_rebin: # Switch from conserving pixel center position to pixel corner, so # that naive downscaling back will give the same image as original new.wcs.crpix -= -0.5 * scale + 0.5 return new
[docs] def fit_astrometric_residuals( x_obs, y_obs, x_pred, y_pred, *, image_shape=None, backend='grid', **smoother_kwargs, ): """Fit a smooth (dx, dy) correction field for astrometric residuals. Given matched pairs of observed and WCS-predicted pixel positions of the same sources, fit a smooth two-component field that maps any predicted position to the corresponding observed one. The returned callable applies the additive correction at arbitrary positions. Parameters ---------- x_obs, y_obs : array-like, shape (N,) Observed pixel positions of matched sources (e.g. SEP centroids). x_pred, y_pred : array-like, shape (N,) WCS-predicted pixel positions of the same sources from a reference catalogue. ``x_obs - x_pred`` is the residual being modelled. image_shape : (H, W), optional Forwarded to the grid backend; ignored by LOESS. backend : {"grid", "loess"} Smoothing backend; see :func:`stdpipe.smoothing.fit_vector_field_2d`. Default ``"grid"`` because the typical astrometric-residual workflow evaluates the correction at far more positions than were used to fit it (e.g. forced-position photometry of large catalogues), where grid lookup is ~600--1000x faster than LOESS prediction. **smoother_kwargs : Forwarded to :func:`fit_vector_field_2d`. Grid backend keys: ``grid_shape``, ``min_per_cell``, ``smooth_sigma``. LOESS backend keys: ``scales``, ``k``, ``robust_iters``, ... Returns ------- correct : callable ``correct(x, y) -> (x_corrected, y_corrected)``: adds the smooth ``(dx, dy)`` field at ``(x, y)``. Has a ``.smoother`` attribute holding the underlying field model for diagnostics. """ from . import smoothing x_obs = np.asarray(x_obs, float) y_obs = np.asarray(y_obs, float) x_pred = np.asarray(x_pred, float) y_pred = np.asarray(y_pred, float) smoother = smoothing.fit_vector_field_2d( x_pred, y_pred, x_obs - x_pred, y_obs - y_pred, backend=backend, image_shape=image_shape, **smoother_kwargs, ) def correct(x, y): cdx, cdy = smoother(x, y) return np.asarray(x, float) + cdx, np.asarray(y, float) + cdy correct.smoother = smoother return correct
[docs] def refine_positions_from_catalog( match, obj, cat, wcs, *, obj_col_x='x', obj_col_y='y', cat_col_ra='ra', cat_col_dec='dec', image_shape=None, backend='grid', **smoother_kwargs, ): """Build an astrometric residual correction from a match dict. Wraps :func:`fit_astrometric_residuals` with the column-extraction boilerplate needed when the matches come from :func:`stdpipe.pipeline.calibrate_photometry` or :func:`stdpipe.photometry.match`. Returns the correction callable plus a small dictionary of pre/post-correction residual statistics. Parameters ---------- match : dict Cross-match dictionary with at least the keys ``idx`` (boolean mask of accepted matches), ``oidx`` (object indices into ``obj``), and ``cidx`` (catalogue indices into ``cat``). obj, cat : astropy.table.Table Object and catalogue tables matched by ``match``. wcs : astropy.wcs.WCS WCS used to project catalogue positions into pixel coordinates. obj_col_x, obj_col_y : str Column names for observed pixel positions in ``obj``. cat_col_ra, cat_col_dec : str Column names for sky positions in ``cat``. image_shape : (H, W), optional Forwarded to the smoothing backend. backend : {"grid", "loess"} Smoothing backend. **smoother_kwargs : Forwarded to :func:`fit_astrometric_residuals`. Returns ------- correct : callable Same as :func:`fit_astrometric_residuals`. info : dict ``n_matched``, ``n_used``, ``raw_median_dr_pix``, ``corrected_median_dr_pix``, ``raw_q90_dr_pix``, ``corrected_q90_dr_pix``. The corrected statistics are evaluated at the same matched positions used for the fit (in-sample), so they over-state the gain; for an unbiased estimate, subset ``match['idx']`` into a holdout mask before calling this. """ accepted = np.asarray(match['idx'], bool) oidx = np.asarray(match['oidx'])[accepted] cidx = np.asarray(match['cidx'])[accepted] x_obs = np.asarray(obj[obj_col_x][oidx], float) y_obs = np.asarray(obj[obj_col_y][oidx], float) x_pred, y_pred = wcs.all_world2pix( np.asarray(cat[cat_col_ra][cidx], float), np.asarray(cat[cat_col_dec][cidx], float), 0, ) keep = ( np.isfinite(x_obs) & np.isfinite(y_obs) & np.isfinite(x_pred) & np.isfinite(y_pred) ) x_obs = x_obs[keep]; y_obs = y_obs[keep] x_pred = x_pred[keep]; y_pred = y_pred[keep] correct = fit_astrometric_residuals( x_obs, y_obs, x_pred, y_pred, image_shape=image_shape, backend=backend, **smoother_kwargs, ) raw_dx = x_obs - x_pred raw_dy = y_obs - y_pred pred_dx, pred_dy = correct.smoother(x_pred, y_pred) corr_dr = np.hypot(raw_dx - pred_dx, raw_dy - pred_dy) raw_dr = np.hypot(raw_dx, raw_dy) info = { 'n_matched': int(np.sum(accepted)), 'n_used': int(keep.sum()), 'raw_median_dr_pix': float(np.median(raw_dr)) if raw_dr.size else float('nan'), 'corrected_median_dr_pix': float(np.median(corr_dr)) if corr_dr.size else float('nan'), 'raw_q90_dr_pix': float(np.percentile(raw_dr, 90)) if raw_dr.size else float('nan'), 'corrected_q90_dr_pix': float(np.percentile(corr_dr, 90)) if corr_dr.size else float('nan'), } return correct, info