"""
Module for cropping the images and creating image cutouts / postage stamps
"""
import numpy as np
import datetime
from astropy.wcs import WCS
from astropy.io import fits
from astropy.time import Time
from scipy.stats import chi2
from scipy.optimize import minimize
from scipy.ndimage import shift
from . import utils
from . import astrometry
[docs]
def crop_image_centered(data, x0, y0, r0, header=None):
"""Crop a square region centered on a given pixel position.
Output size is ``2*ceil(r0) + 1`` pixels. The original center falls
inside the pixel at ``x, y = ceil(r0), ceil(r0)``. If a FITS header is
provided, it is adjusted so that the WCS solution remains valid for the
cutout.
Parameters
----------
data : ndarray
2D image array to crop.
x0 : float
X coordinate of the center (0-based).
y0 : float
Y coordinate of the center (0-based).
r0 : float
Cutout half-size in pixels.
header : astropy.io.fits.Header, optional
FITS header to adjust (``CRPIX`` shifted in-place on a copy).
Returns
-------
ndarray or tuple
Cropped image, or ``(cropped_image, adjusted_header)`` if ``header`` is provided.
"""
# x1,x2 = int(np.floor(x0 - r0)), int(np.ceil(x0 + r0))
# y1,y2 = int(np.floor(y0 - r0)), int(np.ceil(y0 + r0))
x1, x2 = int(np.round(x0) - np.ceil(r0)), int(np.round(x0) + np.ceil(r0))
y1, y2 = int(np.round(y0) - np.ceil(r0)), int(np.round(y0) + np.ceil(r0))
return crop_image(data, x1, y1, x2 - x1 + 1, y2 - y1 + 1, header=header)
[docs]
def crop_image(data, x1, y1, width, height, header=None):
"""Crop a rectangular region from an image.
Pixels outside the original image boundary are filled with zeros (or NaN
for floating-point images). If a FITS header is provided, ``CRPIX`` is
adjusted so that the WCS solution remains valid for the cutout.
Parameters
----------
data : ndarray
2D image array to crop.
x1 : int
Left edge of the crop region (0-based).
y1 : int
Bottom edge of the crop region (0-based).
width : int
Width of the crop region in pixels.
height : int
Height of the crop region in pixels.
header : astropy.io.fits.Header, optional
FITS header to adjust (copy is modified, original unchanged).
Returns
-------
ndarray or tuple
Cropped image, or ``(cropped_image, adjusted_header)`` if ``header`` is provided.
"""
x2 = x1 + width
y2 = y1 + height
src = [
min(max(y1, 0), data.shape[0]),
max(min(y2, data.shape[0]), 0),
min(max(x1, 0), data.shape[1]),
max(min(x2, data.shape[1]), 0),
]
dst = [src[0] - y1, src[1] - y1, src[2] - x1, src[3] - x1]
sub = np.zeros((y2 - y1, x2 - x1), data.dtype)
if isinstance(data[0][0], np.floating):
# For floating-point we may use NaN as a filler value
sub.fill(np.nan)
sub[dst[0] : dst[1], dst[2] : dst[3]] = data[src[0] : src[1], src[2] : src[3]]
if header is not None:
subheader = header.copy()
subheader['NAXIS1'] = sub.shape[1]
subheader['NAXIS2'] = sub.shape[0]
# Adjust the WCS keywords if present
if 'CRPIX1' in subheader and 'CRPIX2' in subheader:
subheader['CRPIX1'] -= x1
subheader['CRPIX2'] -= y1
# FIXME: should we use 0-based or 1-based coordinates here?..
# Crop position inside original frame
subheader['CROP_X1'] = x1
subheader['CROP_X2'] = x2
subheader['CROP_Y1'] = y1
subheader['CROP_Y2'] = y2
return sub, subheader
else:
return sub
[docs]
def get_cutout(
image, candidate, radius, header=None, wcs=None, time=None, filename=None, name=None, **kwargs
):
"""Create a cutout postage stamp from one or more image planes.
The candidate may be a row from :class:`astropy.table.Table` or a dict with
at least ``x`` and ``y`` keys. The cutout is centered so the object falls
inside the central pixel; size is ``2*ceil(radius) + 1`` pixels.
Parameters
----------
image : ndarray
Primary (science) image plane.
candidate : table row or dict
Object record containing at least ``x`` and ``y`` pixel coordinates.
radius : float
Cutout half-size in pixels.
header : astropy.io.fits.Header, optional
Header of the original image. Copied and adjusted to represent the
cutout WCS; stored as ``cutout['header']``.
wcs : astropy.wcs.WCS, optional
WCS of the original image. Adjusted and stored as ``cutout['wcs']``.
Takes precedence over ``header`` for WCS.
time : astropy.time.Time, datetime, or str, optional
Observation timestamp; stored in ``cutout['meta']['time']``.
filename : str, optional
Source image filename; stored in ``cutout['meta']['filename']``.
name : str, optional
Object name; stored in ``cutout['meta']['name']``. If omitted and
``candidate`` has ``ra`` / ``dec``, a J2000 name is constructed.
**kwargs
Additional 2D arrays interpreted as extra image planes
(e.g. ``mask``, ``diff``, ``template``, ``convolved``, ``err``).
Returns
-------
dict
Cutout dictionary with at least:
- ``image`` — primary image plane
- ``mask``, ``diff``, ``template``, etc. — extra planes if provided
- ``header`` — adjusted FITS header, if provided
- ``wcs`` — adjusted WCS, if provided
- ``meta`` — dict with all candidate fields plus ``time``, ``filename``,
and ``name``
"""
x0, y0 = candidate['x'], candidate['y']
_ = crop_image_centered(image, x0, y0, radius, header=header)
if header is not None:
crop, crophead = _
else:
crop, crophead = _, None
cutout = {'image': crop, 'meta': {}}
if wcs is not None:
cutout['wcs'] = wcs
# Let's update header info with this WCS
astrometry.clear_wcs(crophead, remove_underscored=True, remove_history=True)
crophead += wcs.to_header(relax=True)
if crophead is not None:
cutout['header'] = crophead
if wcs is None:
wcs = WCS(crophead)
cutout['wcs'] = wcs
# Image planes
for pname, plane in kwargs.items():
if plane is not None and np.ndim(plane) == 2:
cutout[pname] = crop_image_centered(plane, x0, y0, radius)
# Metadata
for _ in candidate.keys():
cutout['meta'][_] = candidate[_]
# Additional metadata to add or override
if time is not None:
cutout['meta']['time'] = Time(time)
if filename is not None:
cutout['meta']['filename'] = filename
if name is not None:
cutout['meta']['name'] = name
elif (
'name' not in cutout['meta']
and 'ra' in cutout['meta'].keys()
and 'dec' in cutout['meta'].keys()
):
cutout['meta']['name'] = utils.make_jname(candidate['ra'], candidate['dec'])
return cutout
[docs]
def write_cutout(cutout, filename):
"""Store a cutout as a multi-extension FITS file.
Each image plane becomes a named FITS extension; metadata is stored as
keywords in the primary header.
Parameters
----------
cutout : dict
Cutout structure as returned by :func:`get_cutout`.
filename : str
Output FITS filename.
"""
hdus = []
# Store metadata to primary header
hdu = fits.PrimaryHDU()
for _ in cutout['meta']:
data = cutout['meta'][_]
# Special handling for unsupported FITS types
if data is None:
data = None
elif type(data) == Time or type(data) == datetime.datetime:
data = Time(data).to_value('fits')
elif not np.isscalar(data):
# NumPy arrays and like
data = repr(np.ma.filled(data))
elif np.isreal(data) and np.isnan(data):
data = 'NaN'
elif np.isreal(data) and not np.isfinite(data):
data = 'Inf'
hdu.header[_] = data
for _ in [
'x',
'y',
'ra',
'dec',
'mag',
'magerr',
'mag_calib',
'flags',
'id',
'time',
'filename',
]:
if _ in cutout:
data = cutout[_]
# Special handling for unsupported FITS types
if _ == 'time':
data = Time(data).to_value('fits')
hdu.header[_] = data
hdus.append(hdu)
# Store imaging data to named extensions
for _ in cutout.keys():
if _ not in ['meta', 'header', 'wcs'] and np.ndim(cutout[_]) == 2:
data = cutout[_]
if data.dtype == bool:
data = data.astype(np.uint16)
hdu = fits.ImageHDU(data, header=cutout.get('header'), name=_)
hdus.append(hdu)
fits.HDUList(hdus).writeto(filename, overwrite=True)
[docs]
def load_cutout(filename):
"""Restore a cutout from a multi-extension FITS file.
Parameters
----------
filename : str
Path to a FITS file written by :func:`write_cutout`.
Returns
-------
dict
Cutout structure as returned by :func:`get_cutout`.
"""
hdus = fits.open(filename)
cutout = {'meta': {}}
for _ in hdus[0].header[4:]:
name = _.lower()
data = hdus[0].header[_]
if name == 'time':
data = Time(data)
elif data == 'NaN':
data = np.nan
elif data == 'Inf':
data = np.inf
elif isinstance(data, str) and data.startswith('array('):
# FIXME: any safer way to convert it back to array?..
array = np.array
data = eval(data)
cutout['meta'][name] = data
for hdu in hdus[1:]:
if 'header' not in cutout:
cutout['header'] = hdu.header
name = hdu.name.lower()
cutout[name] = hdu.data
# Special handling of mask plane
if name == 'mask':
cutout[name] = cutout[name].astype(bool)
hdus.close()
if cutout['header']:
cutout['wcs'] = WCS(cutout['header'])
return cutout
[docs]
def adjust_cutout(
cutout,
max_shift=2,
max_scale=1.1,
inner=None,
normalize=False,
fit_bg=False,
verbose=False,
):
"""Fit a positional and flux-scaling adjustment to minimize the image–template residual.
Optimizes a shift (dx, dy), scale, and optionally background levels to
minimize the chi-squared residual between ``cutout['image']`` and
``cutout['convolved']``. On success, adds a ``cutout['adjusted']`` plane
with the optimized difference.
Parameters
----------
cutout : dict
Cutout structure as returned by :func:`get_cutout`. Must contain
``image``, ``convolved``, and ``err`` planes.
max_shift : float, optional
Maximum allowed positional shift in pixels (symmetric bound).
max_scale : float, optional
Maximum allowed flux scale factor; scale is bounded to
``(1/max_scale, max_scale)``.
inner : int, optional
If set, only the central ``inner × inner`` pixel box is used for
optimization.
normalize : bool, optional
If True, divide the ``adjusted`` plane by the ``err`` plane.
fit_bg : bool, optional
If True, fit background levels as free parameters. If False, estimate
backgrounds from the cutout using SExtractor-mode
(``2.5*median − 1.5*mean``) and hold them fixed.
verbose : bool or callable, optional
Whether to show verbose messages. May be boolean or a ``print``-like callable.
Returns
-------
bool
``True`` if optimization succeeded, ``False`` otherwise.
Notes
-----
On success the following keys are added to ``cutout['meta']``:
``adjust_chi2_0``, ``adjust_chi2``, ``adjust_df``, ``adjust_pval``,
``adjust_dx``, ``adjust_dy``, ``adjust_scale``, ``adjust_bg``, ``adjust_tbg``.
"""
# Simple wrapper around print for logging in verbose mode only
log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None
mask = cutout['mask'] if 'mask' in cutout else ~np.isfinite(cutout['image'])
imask = np.zeros_like(mask)
# Rough estimation of backgrounds in the image and the template, using SExtractor-like mode estimation
# bg = np.nanmedian(cutout['image'][~mask])
bg = 2.5 * np.nanmedian(cutout['image'][~mask]) - 1.5 * np.nanmean(cutout['image'][~mask])
# tbg = np.nanmedian(cutout['convolved'][~mask])
tbg = 2.5 * np.nanmedian(cutout['convolved'][~mask]) - 1.5 * np.nanmean(
cutout['convolved'][~mask]
)
if inner is not None and inner > 0:
# Mask everything outside of a central box with given size
x, y = np.mgrid[0 : mask.shape[1], 0 : mask.shape[0]]
idx = np.abs(x - mask.shape[1] / 2 + 0.5) > inner / 2
idx |= np.abs(y - mask.shape[0] / 2 + 0.5) > inner / 2
imask[idx] = True
# Prepare and cleanup the arrays we will use for fitting (as 'shift' does not like nans etc)
image = cutout['image'].copy()
tmpl = cutout['convolved'].copy()
err = cutout['err'].copy()
image[~np.isfinite(image)] = 0
tmpl[~np.isfinite(tmpl)] = 0
err[~np.isfinite(err)] = 0
def _fn(dx, get_df=False, get_diff=False):
mask1 = mask | shift(mask, dx[:2], mode='reflect')
imask1 = mask1 | imask
diff = image - dx[3] - shift(tmpl - dx[4], dx[:2], mode='reflect') * dx[2]
chi2_value = np.sum((diff[~imask1] / err[~imask1]) ** 2) # Chi2
if get_diff:
return diff, mask1, imask1
elif get_df:
return chi2_value, np.sum(~imask1)
else:
return chi2_value
if fit_bg:
res = minimize(
_fn,
(0, 0, 1, bg, tbg),
bounds=(
(-max_shift, max_shift),
(-max_shift, max_shift),
(1 / max_scale, max_scale),
(None, None),
(None, None),
),
method='Powell',
options={'disp': False},
)
else:
res = minimize(
_fn,
(0, 0, 1, bg, tbg),
bounds=(
(-max_shift, max_shift),
(-max_shift, max_shift),
(1 / max_scale, max_scale),
(bg, bg),
(tbg, tbg),
),
method='Powell',
options={'disp': False},
)
log(res.message)
if res.success:
log(
'Adjustment is: %.2f %.2f bg %.2g tbg %.2g scale %.2f'
% (res.x[0], res.x[1], res.x[3], res.x[4], res.x[2])
)
log('Chi2 improvement: %.2f -> %.2f' % (_fn([0, 0, 1, bg, tbg]), _fn(res.x)))
diff, mask1, _ = _fn(res.x, get_diff=True)
chi2_0 = _fn([0, 0, 1, bg, tbg])
chi2_1, df = _fn(res.x, get_df=True)
log('Final Chi2: %.2f df: %d p-value: %.2g' % (chi2_1, df, chi2.sf(chi2_1, df)))
# Keep the adjustment results in the metadata
cutout['meta']['adjust_chi2_0'] = chi2_0
cutout['meta']['adjust_chi2'] = chi2_1
cutout['meta']['adjust_df'] = df
cutout['meta']['adjust_pval'] = chi2.sf(chi2_1, df)
cutout['meta']['adjust_dx'] = res.x[0]
cutout['meta']['adjust_dy'] = res.x[1]
cutout['meta']['adjust_scale'] = res.x[2]
cutout['meta']['adjust_bg'] = res.x[3]
cutout['meta']['adjust_tbg'] = res.x[4]
if normalize:
diff[~mask1] /= err[~mask1]
diff[err > 1e-30] /= err[err > 1e-30]
# diff[mask1] = 1e-30
# Add result as a new cutout plane
cutout['adjusted'] = diff
return True
else:
return False
[docs]
def downscale_image(image, scale=1, mode='sum', header=None):
"""Downscale an image by an integer factor.
If the image dimensions are not divisible by ``scale``, the image is
cropped to the largest divisible size first. If a FITS header is
provided, the WCS is adjusted for the downscaled pixel grid.
Parameters
----------
image : ndarray
2D image array to downscale.
scale : int, optional
Integer downscaling factor.
mode : str, optional
Pixel reduction mode: ``'sum'`` (default), ``'mean'``, ``'and'``, or ``'or'``.
header : astropy.io.fits.Header, optional
If provided, the WCS is adjusted and a new header is returned alongside
the downscaled image.
Returns
-------
ndarray or tuple
Downscaled image, or ``(downscaled_image, adjusted_header)`` if
``header`` is provided.
"""
# Crop the image if necessary
maxx = scale * (image.shape[1] // scale)
maxy = scale * (image.shape[0] // scale)
image = image[:maxy, :maxx]
shape = (image.shape[0] // scale, scale, image.shape[1] // scale, scale)
image1 = image.reshape(shape)
if mode == 'mean':
image1 = image1.mean(-1).mean(1)
elif mode == 'and':
image1 = np.bitwise_and.reduce(image1, -1)
image1 = np.bitwise_and.reduce(image1, 1)
elif mode == 'or':
image1 = np.bitwise_or.reduce(image1, -1)
image1 = np.bitwise_or.reduce(image1, 1)
else:
# Fallback to mode='sum'
image1 = image1.sum(-1).sum(1)
if header is not None:
wcs = WCS(header)
wcs = wcs[::scale, ::scale]
header1 = astrometry.clear_wcs(header, copy=True)
header1 += wcs.to_header(relax=True)
return image1, header1
else:
return image1