Source code for stdpipe.reproject

"""
Image reprojection routines.

Provides :func:`reproject_swarp` (SWarp wrapper) and :func:`reproject_lanczos`
(pure-Python Lanczos interpolation with SWarp-style oversampling and Jacobian
flux conservation).
"""

import numpy as np

import os
import tempfile
import shlex
import time
import shutil
from concurrent.futures import ThreadPoolExecutor

from astropy.wcs import WCS
from astropy.wcs.utils import proj_plane_pixel_scales
from astropy.io import fits

from . import utils
from . import astrometry


def _pixel_to_pixel(wcs_from, wcs_to, x, y):
    """Map pixel coordinates from one WCS to another.

    Like :func:`astropy.wcs.utils.pixel_to_pixel` but uses ``quiet=True``
    for the inverse SIP transformation so that out-of-domain pixels
    produce NaN instead of raising convergence warnings.
    """
    sky = wcs_from.all_pix2world(x, y, 0)
    try:
        pix = wcs_to.all_world2pix(sky[0], sky[1], 0, quiet=True)
    except Exception:
        # Fallback if quiet mode is not supported
        pix = wcs_to.all_world2pix(sky[0], sky[1], 0)
    return pix


# ---------------------------------------------------------------------------
# Lanczos helpers
# ---------------------------------------------------------------------------


def _lanczos_kernel(x, a):
    """Lanczos kernel of order *a*."""
    x = np.asarray(x, dtype=np.float64)
    result = np.zeros_like(x)
    mask = np.abs(x) < a
    zero = x == 0
    result[zero] = 1.0
    nonzero = mask & ~zero
    xn = x[nonzero]
    result[nonzero] = np.sin(np.pi * xn) * np.sin(np.pi * xn / a) / (np.pi * xn * np.pi * xn / a)
    return result


def _lanczos_map_coordinates(image, coords, a=3, cval=np.nan):
    """Interpolate *image* at fractional pixel coordinates using Lanczos kernel.

    Parameters
    ----------
    image : 2D array
    coords : (2, N) array of (row, col) coordinates
    a : int
        Lanczos kernel order (2, 3 or 4 typical).
    cval : float
        Fill value for out-of-bounds pixels.

    Returns
    -------
    values : 1D array of interpolated values
    """
    ny, nx = image.shape
    n_pts = coords.shape[1]
    result = np.full(n_pts, cval, dtype=np.float64)

    yr, xr = coords[0], coords[1]

    # Filter out-of-bounds
    valid = (yr >= -0.5) & (yr < ny - 0.5) & (xr >= -0.5) & (xr < nx - 0.5)

    yr_v = yr[valid]
    xr_v = xr[valid]

    if len(yr_v) == 0:
        return result

    # Integer and fractional parts
    iy = np.floor(yr_v).astype(int)
    ix = np.floor(xr_v).astype(int)
    fy = yr_v - iy
    fx = xr_v - ix

    # Kernel support: -a+1 to a
    offsets = np.arange(-a + 1, a + 1)

    # Precompute kernels: shape (n_valid, 2*a)
    ky = _lanczos_kernel(fy[:, None] - offsets[None, :], a)
    kx = _lanczos_kernel(fx[:, None] - offsets[None, :], a)

    # Normalize
    ky /= ky.sum(axis=1, keepdims=True)
    kx /= kx.sum(axis=1, keepdims=True)

    # Row and column indices
    row_idx = np.clip(iy[:, None] + offsets[None, :], 0, ny - 1)
    col_idx = np.clip(ix[:, None] + offsets[None, :], 0, nx - 1)

    # Apply separable kernel (vectorized: 2a iters instead of 4a²)
    vals = np.zeros(len(yr_v))
    for j in range(len(offsets)):
        row_pixels = image[row_idx[:, j][:, None], col_idx]  # (n_valid, 2a)
        vals += ky[:, j] * np.sum(kx * row_pixels, axis=1)

    result[valid] = vals
    return result


def _reproject_single_flags(image, wcs_in, wcs_out, shape_out):
    """Reproject a single integer flag image using nearest-neighbor sampling.

    Returns
    -------
    result : 2D integer array (0 where no data)
    footprint : 2D float array (1.0 where covered, 0.0 elsewhere)
    """
    ny_out, nx_out = shape_out
    image = np.asarray(image)
    dtype = image.dtype

    yy, xx = np.mgrid[0:ny_out, 0:nx_out]
    pixel_in = _pixel_to_pixel(wcs_out, wcs_in, xx.ravel().astype(float), yy.ravel().astype(float))
    ix = np.round(np.asarray(pixel_in[0])).astype(int)
    iy = np.round(np.asarray(pixel_in[1])).astype(int)

    ny_in, nx_in = image.shape
    valid = (ix >= 0) & (ix < nx_in) & (iy >= 0) & (iy < ny_in)

    if np.issubdtype(dtype, np.floating):
        result = np.full(ny_out * nx_out, np.nan, dtype=dtype)
    else:
        result = np.zeros(ny_out * nx_out, dtype=dtype)

    result[valid] = image[iy[valid], ix[valid]]

    footprint = valid.astype(np.float64).reshape(shape_out)
    return result.reshape(shape_out), footprint


def _reproject_chunk(
    image, wcs_in, wcs_out, row_start, row_end, nx_out, order, oversamp, area_ratio, sub_offsets
):
    """Reproject a horizontal chunk of rows.  Used by :func:`_reproject_single`.

    Returns
    -------
    row_start : int
    result : 2D array (NaN where no data)
    footprint : 2D float array (fractional coverage 0.0–1.0)
    """
    ny_chunk = row_end - row_start

    if oversamp <= 1:
        yy, xx = np.mgrid[row_start:row_end, 0:nx_out]
        pixel_in = _pixel_to_pixel(
            wcs_out, wcs_in, xx.ravel().astype(float), yy.ravel().astype(float)
        )
        coords = np.array([np.asarray(pixel_in[1]), np.asarray(pixel_in[0])])
        values = _lanczos_map_coordinates(image, coords, a=order)
        result = values.reshape(ny_chunk, nx_out) * area_ratio
        footprint = np.isfinite(result).astype(np.float64)
        return row_start, result, footprint

    chunk_shape = (ny_chunk, nx_out)
    accumulator = np.zeros(chunk_shape, dtype=np.float64)
    count = np.zeros(chunk_shape, dtype=np.int32)
    n_sub = len(sub_offsets) ** 2

    for dy_off in sub_offsets:
        for dx_off in sub_offsets:
            yy, xx = np.mgrid[row_start:row_end, 0:nx_out]
            pixel_out_x = (xx + dx_off).ravel().astype(float)
            pixel_out_y = (yy + dy_off).ravel().astype(float)

            pixel_in = _pixel_to_pixel(wcs_out, wcs_in, pixel_out_x, pixel_out_y)
            coords = np.array([np.asarray(pixel_in[1]), np.asarray(pixel_in[0])])
            values = _lanczos_map_coordinates(image, coords, a=order)
            vals_2d = values.reshape(chunk_shape)

            valid = np.isfinite(vals_2d)
            accumulator[valid] += vals_2d[valid]
            count[valid] += 1

    result = np.full(chunk_shape, np.nan, dtype=np.float64)
    good = count > 0
    result[good] = (accumulator[good] / count[good]) * area_ratio
    footprint = count.astype(np.float64) / n_sub
    return row_start, result, footprint


def _reproject_single(
    image, wcs_in, wcs_out, shape_out, order, conserve_flux, oversamp, parallel=False
):
    """Reproject a single image with Lanczos interpolation.

    Parameters
    ----------
    parallel : bool or int
        If True, use threads (number chosen automatically).
        If int > 1, use that many threads.

    Returns
    -------
    result : 2D array (NaN where no data)
    footprint : 2D float array (fractional coverage 0.0–1.0)
    """
    ny_out, nx_out = shape_out
    image = np.asarray(image, dtype=np.float64)

    # Pixel scale ratio
    scale_in = np.mean(proj_plane_pixel_scales(wcs_in))
    scale_out = np.mean(proj_plane_pixel_scales(wcs_out))
    scale_ratio = scale_out / scale_in  # >1 means output pixels larger

    # Auto oversampling
    if oversamp is None:
        oversamp = max(1, int(scale_ratio + 0.5))

    # Jacobian area ratio for flux conservation
    area_ratio = scale_ratio**2 if conserve_flux else 1.0

    # Sub-pixel offsets for oversampling
    if oversamp > 1:
        step = 1.0 / oversamp
        sub_offsets = np.arange(oversamp) * step + step / 2 - 0.5
    else:
        sub_offsets = None

    # Determine number of workers
    if parallel is True:
        n_workers = min(os.cpu_count() or 1, ny_out)
    elif isinstance(parallel, int) and parallel > 1:
        n_workers = min(parallel, ny_out)
    else:
        n_workers = 1

    if n_workers <= 1:
        # Sequential path
        _, result, footprint = _reproject_chunk(
            image, wcs_in, wcs_out, 0, ny_out, nx_out, order, oversamp, area_ratio, sub_offsets
        )
        return result, footprint

    # Threaded path — split by row chunks
    chunk_size = max(1, (ny_out + n_workers - 1) // n_workers)
    row_ranges = []
    for i in range(n_workers):
        rs = i * chunk_size
        re = min(rs + chunk_size, ny_out)
        if rs < re:
            row_ranges.append((rs, re))

    result = np.full(shape_out, np.nan, dtype=np.float64)
    footprint = np.zeros(shape_out, dtype=np.float64)
    with ThreadPoolExecutor(max_workers=len(row_ranges)) as pool:
        futures = [
            pool.submit(
                _reproject_chunk,
                image,
                wcs_in,
                wcs_out,
                rs,
                re,
                nx_out,
                order,
                oversamp,
                area_ratio,
                sub_offsets,
            )
            for rs, re in row_ranges
        ]
        for f in futures:
            row_start, chunk, fp_chunk = f.result()
            nrows = chunk.shape[0]
            result[row_start : row_start + nrows] = chunk
            footprint[row_start : row_start + nrows] = fp_chunk

    return result, footprint


# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------


[docs] def reproject_lanczos( input=None, wcs=None, shape=None, width=None, height=None, header=None, order=3, conserve_flux=True, oversamp=None, is_flags=False, use_nans=True, parallel=False, return_footprint=False, verbose=False, ): """Reproject images using Lanczos interpolation with automatic oversampling. Implements SWarp-style oversampling (sub-pixel averaging when output pixels are larger than input pixels) and Jacobian area scaling for flux conservation. Accepts the same input format as :func:`reproject_swarp`: a list of ``(image, header/WCS)`` tuples or a list of FITS filenames. For multiple inputs the reprojected frames are averaged (simple coadd). Parameters ---------- input : list or tuple List of ``(image, header_or_wcs)`` tuples or FITS filenames. A single ``(image, header_or_wcs)`` tuple is also accepted (wrapped into a list automatically for reproject compatibility). wcs : `~astropy.wcs.WCS`, optional Output WCS. Ignored if *header* already contains WCS. shape : tuple, optional Output ``(height, width)``. width, height : int, optional Output dimensions (alternative to *shape*). header : `~astropy.io.fits.Header`, optional Output FITS header (overrides *wcs*/*shape*/*width*/*height*). order : int Lanczos kernel order (default 3). conserve_flux : bool If True (default), multiply by the Jacobian area ratio so that *total flux* is conserved. If False, *surface brightness* is conserved instead. oversamp : int or None Sub-pixel oversampling factor per axis. ``None`` (default) selects automatically: ``max(1, round(output_scale / input_scale))``. is_flags : bool If True, treat input as integer flag/mask images: use nearest-neighbor resampling (no interpolation) and bitwise AND for combining multiple inputs. Overrides *order*, *conserve_flux* and *oversamp*. use_nans : bool If True (default), fill regions with no input coverage with NaN (floating-point images) or ``0xFFFF`` (integer flag images). parallel : bool or int If True, use threads for parallel interpolation (number chosen automatically). If int > 1, use that many threads. Gives ~3-4x speedup on multi-core machines. return_footprint : bool If True, return ``(coadd, footprint)`` where *footprint* is a float array with values between 0.0 (no coverage) and 1.0 (full coverage). When oversampling is active, fractional values indicate partial sub-pixel coverage. Default is False for backward compatibility. verbose : bool or callable Logging control. Returns ------- coadd : 2D `~numpy.ndarray` or None Reprojected (and optionally coadded) image. footprint : 2D `~numpy.ndarray` Coverage map (only returned when ``return_footprint=True``). """ if input is None: input = [] # Accept a single (image, header/WCS) tuple for reproject compatibility if isinstance(input, tuple) and len(input) == 2 and isinstance(input[0], np.ndarray): input = [input] log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None # Resolve output geometry if header is not None: header = header.copy() else: header = fits.Header({'NAXIS': 2, 'BITPIX': -64, 'EQUINOX': 2000.0}) if wcs is not None and wcs.is_celestial: astrometry.clear_wcs(header) header += wcs.to_header(relax=True) if (width is None or height is None) and shape is not None: height, width = shape if width is not None: header['NAXIS1'] = width if height is not None: header['NAXIS2'] = height wcs_out = WCS(header) if not wcs_out.is_celestial: log("Can't reproject without target WCS") return None shape_out = (header['NAXIS2'], header['NAXIS1']) if is_flags: log('Input images will be handled as integer flags') # Collect input frames frames = [] for item in input: if isinstance(item, str): hdulist = fits.open(item) img = hdulist[0].data if not is_flags: img = img.astype(np.float64) wcs_in = WCS(hdulist[0].header) hdulist.close() else: img, hdr_or_wcs = item if not is_flags: img = np.asarray(img, dtype=np.float64) if isinstance(hdr_or_wcs, WCS): wcs_in = hdr_or_wcs else: wcs_in = WCS(hdr_or_wcs) frames.append((img, wcs_in)) if not frames: log("No input frames") if return_footprint: return None, None return None # Reproject each frame results = [] footprints = [] for i, (img, wcs_in) in enumerate(frames): log('Reprojecting frame %d/%d' % (i + 1, len(frames))) if is_flags: result, fp = _reproject_single_flags(img, wcs_in, wcs_out, shape_out) else: result, fp = _reproject_single( img, wcs_in, wcs_out, shape_out, order, conserve_flux, oversamp, parallel=parallel ) results.append(result) footprints.append(fp) # Coadd if is_flags: # Bitwise AND of all frames (like SWarp COMBINE_TYPE=AND) coadd = results[0].copy() for r in results[1:]: coadd &= r # Combined footprint: covered if any frame covers the pixel footprint = np.maximum.reduce(footprints) if use_nans: # For integer flags there's no NaN; use 0xFFFF sentinel # Determine uncovered pixels from first frame (zeros where no data) # We can't reliably distinguish "flag value 0" from "no coverage" # for a single frame, but for consistency with reproject_swarp # we leave the result as-is (nearest-neighbor always maps somewhere # unless the output pixel falls outside the input footprint). pass else: if len(results) == 1: coadd = results[0] footprint = footprints[0] else: stack = np.array(results) valid = np.isfinite(stack) count = valid.sum(axis=0) coadd = np.full(shape_out, np.nan, dtype=np.float64) good = count > 0 coadd[good] = np.nansum(stack[:, good], axis=0) / count[good] # Average footprint across frames footprint = np.mean(footprints, axis=0) if return_footprint: return coadd, footprint return coadd
[docs] def reproject_swarp( input=None, wcs=None, shape=None, width=None, height=None, header=None, extra=None, is_flags=False, use_nans=True, get_weights=False, _workdir=None, _tmpdir=None, _exe=None, verbose=False, ): """ Wrapper for running SWarp for re-projecting and mosaicking of images onto target WCS grid. It accepts as input either list of filenames, or list of tuples where first element is an image, and second one - either FITS header or WCS. If the input images are integer flags, set `is_flags=True` so that it will be handled by passing `RESAMPLING_TYPE=FLAGS` and `COMBINE_TYPE=AND`. If `use_nans=True`, the regions with zero weights will be filled with NaNs (or 0xFFFF). Any additional configuration parameter may be passed to SWarp through `extra` argument which should be the dictionary with parameter names as keys. """ if input is None: input = [] if extra is None: extra = {} # Simple wrapper around print for logging in verbose mode only log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None # Find the binary binname = None if _exe is not None: # Check user-provided binary path, and fail if not found if os.path.isfile(_exe): binname = _exe else: # Find SWarp binary in common paths for exe in ['swarp']: binname = shutil.which(exe) if binname is not None: break if binname is None: log("Can't find SWarp binary") return None # else: # log("Using SWarp binary at", binname) if (width is None or height is None) and shape is not None: height, width = shape if header is None: # Construct minimal FITS header header = fits.Header( { 'NAXIS': 2, 'NAXIS1': width, 'NAXIS2': height, 'BITPIX': -64, 'EQUINOX': 2000.0, } ) else: header = header.copy() if wcs is not None and wcs.is_celestial: # Add WCS information to the header astrometry.clear_wcs(header) whdr = wcs.to_header(relax=True) if wcs.sip is not None: whdr = astrometry.wcs_sip2pv(whdr) # Here we will try to fix some common problems with WCS not supported by SWarp # FIXME: handle SIP distortions! if wcs.wcs.has_pc() and 'PC1_1' not in whdr: pc = wcs.wcs.get_pc() whdr['PC1_1'] = pc[0, 0] whdr['PC1_2'] = pc[0, 1] whdr['PC2_1'] = pc[1, 0] whdr['PC2_2'] = pc[1, 1] header += whdr else: wcs = WCS(header) if wcs is None or not wcs.is_celestial: log("Can't re-project without target WCS") return None workdir = _workdir if _workdir is not None else tempfile.mkdtemp(prefix='swarp', dir=_tmpdir) # Output coadd filename coaddname = os.path.join(workdir, 'coadd.fits') if os.path.exists(coaddname): os.unlink(coaddname) # Input header filename - the result will be re-projected to it headername = os.path.join(workdir, 'coadd.head') utils.file_write(headername, header.tostring(endcard=True, sep='\n')) # Output weights filename weightsname = os.path.join(workdir, 'coadd.weights.fits') # Dummy config filename, to prevent loading from current dir confname = os.path.join(workdir, 'empty.conf') utils.file_write(confname) xmlname = os.path.join(workdir, 'swarp.xml') opts = { 'VERBOSE_TYPE': 'QUIET' if not verbose else 'NORMAL', 'IMAGEOUT_NAME': coaddname, 'WEIGHTOUT_NAME': weightsname, 'c': confname, 'XML_NAME': xmlname, 'VMEM_DIR': workdir, 'RESAMPLE_DIR': workdir, # 'SUBTRACT_BACK': False, # Do not subtract the backgrounds 'FSCALASTRO_TYPE': 'VARIABLE', # and not re-scale the images by default } if is_flags: log('The images will be handled as integer flags') opts['RESAMPLING_TYPE'] = 'FLAGS' opts['COMBINE_TYPE'] = 'AND' # Use only common flags in overlapping masks opts.update(extra) # Handle input data filenames = [] bzero = 0 for i, item in enumerate(input): if isinstance(item, str): # Item is filename already filename = item elif len(item) == 2: # It should be a tuple of image plus header or WCS image = item[0] header = item[1] if image.dtype.name == 'bool': image = image.astype(np.int16) if isinstance(header, WCS): header = header.to_header(relax=True) # Convert SIP headers to TPV if WCS(header).sip is not None: header = astrometry.wcs_sip2pv(header) filename = os.path.join(workdir, 'image_%04d.fits' % i) fits.writeto(filename, image, header, overwrite=True) filenames.append(filename) bzero = max( bzero, fits.getheader(filename).get('BZERO', 0) ) # Keep the largest BZERO among input files # Build the command line command = ( binname + ' ' + utils.format_astromatic_opts(opts) + ' ' + ' '.join([shlex.quote(_) for _ in filenames]) ) if not verbose: command += ' > /dev/null 2>/dev/null' log('Will run SWarp like that:') log(command) # Run the command! t0 = time.time() res = os.system(command) t1 = time.time() if res == 0 and os.path.exists(coaddname) and os.path.exists(weightsname): log('SWarp run successfully in %.2f seconds' % (t1 - t0)) coadd = fits.getdata(coaddname) weights = fits.getdata(weightsname) # it seems SWarp adds BZERO to the output if inputs had them (e.g. unsigned ints do) # FIXME: this point needs further investigation! if np.issubdtype(coadd.dtype.type, int): coadd -= bzero if use_nans: if np.issubdtype(coadd.dtype, np.floating): coadd[weights == 0] = np.nan else: coadd[weights == 0] = 0xFFFF else: log('Error', res, 'running SWarp') coadd = None weights = None if _workdir is None: shutil.rmtree(workdir) if get_weights: return coadd, weights else: return coadd