"""
Image simulation module for generating realistic stellar fields with contaminants.
This module provides functions for creating simulated astronomical images with:
- Stars with Gaussian and Moffat PSF (pixel-integrated)
- Extended sources (galaxies with Sersic profiles)
- Imaging artifacts (cosmic rays, hot pixels, bad columns, satellite trails)
- Diffraction spikes and optical ghosts for bright sources
Designed for real-bogus classifier development and photometry testing.
"""
from __future__ import absolute_import, division, print_function, unicode_literals
import numpy as np
from astropy.table import Table
from scipy.ndimage import rotate, gaussian_filter
[docs]
def create_psf_model(
fwhm=3.0,
psf_type='gaussian',
beta=2.5,
size=None,
oversampling=2,
defocus=0.0,
astigmatism_x=0.0,
astigmatism_y=0.0,
coma_x=0.0,
coma_y=0.0,
wavelength=550e-9,
pupil_diameter=1.0,
):
"""
Create an oversampled PSF model compatible with PSFEx format.
This creates a PSF model that can be used with psf.place_psf_stamp() and other
STDPipe routines. Uses oversampling for accurate flux conservation and fast
evaluation with sub-pixel positioning.
Optical aberrations can be added via Zernike polynomial coefficients (Noll ordering).
When any aberration coefficient is non-zero, a diffraction PSF is computed via
Fourier optics and convolved with the atmospheric seeing PSF (Gaussian or Moffat).
When all coefficients are zero, the existing analytical path is used unchanged.
Parameters
----------
fwhm : float, optional
Full width at half maximum in pixels.
psf_type : str, optional
Type of PSF model ('gaussian' or 'moffat').
beta : float, optional
Moffat beta parameter (default 2.5, only used for psf_type='moffat').
size : int, optional
Output stamp size in pixels (after downsampling). If None, automatically
sized to capture ~99% of PSF flux (>=8*FWHM).
oversampling : int, optional
Oversampling factor (default 2).
defocus : float, optional
Zernike Z4 defocus coefficient in waves (default 0.0).
astigmatism_x : float, optional
Zernike Z5 oblique astigmatism coefficient in waves (default 0.0).
astigmatism_y : float, optional
Zernike Z6 vertical astigmatism coefficient in waves (default 0.0).
coma_x : float, optional
Zernike Z7 vertical coma coefficient in waves (default 0.0).
coma_y : float, optional
Zernike Z8 horizontal coma coefficient in waves (default 0.0).
wavelength : float, optional
Observation wavelength in meters (default 550e-9, affects diffraction scale).
pupil_diameter : float, optional
Pupil diameter in meters (default 1.0, affects diffraction scale).
Returns
-------
dict
Dictionary with PSFEx-compatible structure containing the PSF model.
"""
# Auto-size to capture full PSF if not specified
# For Gaussian/Moffat PSF, significant flux extends to ~4 FWHM from center
# Use 8*FWHM total size to capture ~99% of flux
if size is None:
size = int(np.ceil(fwhm * 8))
if size % 2 == 0:
size += 1
# Oversampled data grid following PSFEx convention:
# - data array has size * oversampling pixels per side
# - sampling = 1/oversampling (image pixels per data pixel; <1 means oversampled)
# - get_psf_stamp() output size: floor(width * sampling / 2) * 2 + 1 = size (constant)
psf_data_size = size * oversampling
psf_sampling = 1.0 / oversampling # PSFEx convention: image pixels per data pixel
psf_width = float(psf_data_size) # Data array dimension
psf_height = float(psf_data_size)
# Center of the oversampled grid
center = (psf_data_size - 1) / 2.0
# Create coordinate grid in data-pixel space
y, x = np.mgrid[0:psf_data_size, 0:psf_data_size]
if psf_type.lower() == 'gaussian':
# Pixel-integrated Gaussian PSF using error function
# This properly accounts for flux integrated over each data pixel area
# Data pixel [i-0.5, i+0.5] covers image space [(i-0.5)*sampling, (i+0.5)*sampling]
from scipy.special import erf
sigma = fwhm / (2 * np.sqrt(2 * np.log(2))) # in image pixels
sqrt2_sigma = np.sqrt(2) * sigma
# X-direction: integrate Gaussian over each data pixel, converting to image space
erf_xp = erf((x + 0.5 - center) * psf_sampling / sqrt2_sigma)
erf_xm = erf((x - 0.5 - center) * psf_sampling / sqrt2_sigma)
integral_x = 0.5 * (erf_xp - erf_xm)
# Y-direction integration
erf_yp = erf((y + 0.5 - center) * psf_sampling / sqrt2_sigma)
erf_ym = erf((y - 0.5 - center) * psf_sampling / sqrt2_sigma)
integral_y = 0.5 * (erf_yp - erf_ym)
# Combined 2D pixel-integrated PSF
psf_data = integral_x * integral_y
elif psf_type.lower() == 'moffat':
# Moffat PSF with pixel integration
# For Moffat, analytical pixel integration is complex, so we use
# supersampling for accurate pixel integration
alpha = fwhm / (2 * np.sqrt(2 ** (1.0 / beta) - 1))
# Use 5x supersampling for pixel integration (vectorized)
# Offsets are in data-pixel space; convert to image space with psf_sampling
supersample = 5
offsets = np.linspace(-0.5 + 0.5 / supersample, 0.5 - 0.5 / supersample, supersample)
psf_data = np.zeros((psf_data_size, psf_data_size))
for dy in offsets:
for dx in offsets:
r = np.sqrt(
((x + dx - center) * psf_sampling) ** 2
+ ((y + dy - center) * psf_sampling) ** 2
)
psf_data += 1.0 / (1 + (r / alpha) ** 2) ** beta
psf_data /= supersample**2
else:
raise ValueError(f"Unknown psf_type '{psf_type}'. Supported types: 'gaussian', 'moffat'")
# Normalize
psf_data /= np.sum(psf_data)
# Apply optical aberrations if any are non-zero
has_aberrations = any([defocus, astigmatism_x, astigmatism_y, coma_x, coma_y])
if has_aberrations:
from scipy.signal import fftconvolve
# Build pupil grid in polar coordinates on the unit disk
N = psf_data_size
coords = np.linspace(-1, 1, N)
xx, yy = np.meshgrid(coords, coords)
rho = np.sqrt(xx**2 + yy**2)
theta = np.arctan2(yy, xx)
aperture = (rho <= 1.0).astype(float)
# Wavefront from Noll-ordered Zernike polynomials Z4-Z8
W = np.zeros_like(rho)
if defocus:
# Z4: sqrt(3) * (2*rho^2 - 1)
W += defocus * np.sqrt(3) * (2 * rho**2 - 1)
if astigmatism_x:
# Z5: sqrt(6) * rho^2 * sin(2*theta)
W += astigmatism_x * np.sqrt(6) * rho**2 * np.sin(2 * theta)
if astigmatism_y:
# Z6: sqrt(6) * rho^2 * cos(2*theta)
W += astigmatism_y * np.sqrt(6) * rho**2 * np.cos(2 * theta)
if coma_x:
# Z7: sqrt(8) * (3*rho^3 - 2*rho) * sin(theta)
W += coma_x * np.sqrt(8) * (3 * rho**3 - 2 * rho) * np.sin(theta)
if coma_y:
# Z8: sqrt(8) * (3*rho^3 - 2*rho) * cos(theta)
W += coma_y * np.sqrt(8) * (3 * rho**3 - 2 * rho) * np.cos(theta)
# Scale wavefront by wavelength ratio to make PSF wavelength-dependent
# Reference wavelength is 550nm; different wavelengths scale the
# effective aberration strength (shorter wavelength = stronger effect)
wavelength_ref = 550e-9
W *= wavelength_ref / wavelength
# Complex pupil function
pupil = aperture * np.exp(2j * np.pi * W)
# Diffraction PSF via FFT
E = np.fft.fftshift(np.fft.fft2(np.fft.fftshift(pupil)))
psf_diffraction = np.abs(E) ** 2
psf_diffraction /= np.sum(psf_diffraction)
# Convolve diffraction PSF with seeing PSF (already computed as psf_data)
psf_data = fftconvolve(psf_diffraction, psf_data, mode='same')
psf_data /= np.sum(psf_data)
psf_type_name = psf_type.lower() + '_aberrated'
else:
psf_type_name = psf_type.lower()
# Create PSFEx-compatible structure
# Add polynomial dimension (ncoeffs=1 for constant PSF)
psf_data_3d = psf_data[np.newaxis, :, :]
psf_model = {
'data': psf_data_3d,
'width': psf_width,
'height': psf_height,
'sampling': psf_sampling,
'fwhm': fwhm,
'degree': 0, # Constant PSF (no position dependence)
'ncoeffs': 1,
'x0': 0.0, # Reference position
'y0': 0.0,
'sx': 1.0, # Scaling factors
'sy': 1.0,
'psf_type': psf_type_name,
'beta': beta if psf_type.lower() == 'moffat' else None,
}
if has_aberrations:
psf_model['aberrations'] = {
'defocus': defocus,
'astigmatism_x': astigmatism_x,
'astigmatism_y': astigmatism_y,
'coma_x': coma_x,
'coma_y': coma_y,
'wavelength': wavelength,
'pupil_diameter': pupil_diameter,
}
return psf_model
[docs]
def create_sersic_profile(
size, x0, y0, amplitude=1.0, r_eff=5.0, n=1.0, ellipticity=0.0, position_angle=0.0
):
"""
Create a Sersic profile for galaxy simulation.
The Sersic profile is: I(r) = amplitude * exp(-b_n * ((r/r_eff)^(1/n) - 1))
where b_n ≈ 2n - 0.324 for n >= 0.5 (more accurate formula used internally)
Common cases:
- n=0.5: Gaussian-like profile
- n=1.0: Exponential disk (typical spiral galaxy disk)
- n=4.0: de Vaucouleurs profile (typical elliptical galaxy)
Parameters
----------
size : int
Size of the profile stamp (must be odd).
x0 : float
X center position within the stamp.
y0 : float
Y center position within the stamp.
amplitude : float, optional
Central surface brightness amplitude.
r_eff : float, optional
Effective radius in pixels (half-light radius).
n : float, optional
Sersic index (shape parameter).
ellipticity : float, optional
Ellipticity (0 = circular, 0.5 = moderate, 0.9 = very elongated).
position_angle : float, optional
Position angle in degrees (0 = vertical, increases CCW).
Returns
-------
numpy.ndarray
2D array of shape (size, size) with Sersic profile.
"""
# Compute b_n parameter using more accurate formula
# For n >= 0.5: b_n ≈ 2n - 1/3 + 4/(405n) + 46/(25515n^2) + ...
# For simplicity, use: b_n ≈ 1.9992n - 0.3271 for n > 0.5
if n > 0.5:
b_n = 1.9992 * n - 0.3271
else:
b_n = 0.01 # Very broad profile
# Create coordinate grid
y, x = np.mgrid[0:size, 0:size]
x = x - x0
y = y - y0
# Apply rotation
if position_angle != 0:
theta = np.radians(position_angle)
x_rot = x * np.cos(theta) + y * np.sin(theta)
y_rot = -x * np.sin(theta) + y * np.cos(theta)
x, y = x_rot, y_rot
# Apply ellipticity: convert ellipticity to axis ratio
# ellipticity = 1 - b/a, so b/a = 1 - ellipticity
q = 1 - ellipticity # Axis ratio (minor/major)
if q <= 0:
q = 0.01 # Avoid division by zero
# Elliptical radius
r = np.sqrt(x**2 + (y / q) ** 2)
# Sersic profile
profile = amplitude * np.exp(-b_n * ((r / r_eff) ** (1.0 / n) - 1))
return profile
[docs]
def place_galaxy(image, x0, y0, flux, r_eff=5.0, n=1.0, ellipticity=0.0, position_angle=0.0):
"""
Place a galaxy with Sersic profile into an image.
The galaxy stamp is created, scaled to the requested flux, and added to the image.
Similar to psf.place_psf_stamp() but for extended sources.
The image is modified in-place.
Parameters
----------
image : numpy.ndarray
Target image (2D array), modified in-place.
x0 : float
X coordinate where to place the galaxy center.
y0 : float
Y coordinate where to place the galaxy center.
flux : float
Total integrated flux in ADU units.
r_eff : float, optional
Effective radius in pixels.
n : float, optional
Sersic index.
ellipticity : float, optional
Ellipticity (0 = circular, 1 = line).
position_angle : float, optional
Position angle in degrees.
"""
# Determine stamp size (should contain most of the flux)
# For Sersic profiles, most flux is within ~5-8 * r_eff
size = int(np.ceil(r_eff * 8)) * 2 + 1
# Integer coordinates and sub-pixel offset
ix, iy = int(np.round(x0)), int(np.round(y0))
dx_sub = x0 - ix
dy_sub = y0 - iy
# Create galaxy stamp centered with sub-pixel offset
stamp_x0 = size // 2 + dx_sub
stamp_y0 = size // 2 + dy_sub
stamp = create_sersic_profile(
size, stamp_x0, stamp_y0, 1.0, r_eff, n, ellipticity, position_angle
)
# Normalize and scale to requested flux
stamp_sum = np.sum(stamp)
if stamp_sum > 0:
stamp = stamp / stamp_sum * flux
else:
return # Degenerate profile
# Integer coordinates inside the stamp
y, x = np.mgrid[0 : stamp.shape[0], 0 : stamp.shape[1]]
# Corresponding image pixels
y1, x1 = np.mgrid[0 : stamp.shape[0], 0 : stamp.shape[1]]
x1 += int(np.round(x0) - np.floor(stamp.shape[1] / 2))
y1 += int(np.round(y0) - np.floor(stamp.shape[0] / 2))
# Crop coordinates outside target image
idx = np.isfinite(stamp)
idx &= (x1 >= 0) & (x1 < image.shape[1])
idx &= (y1 >= 0) & (y1 < image.shape[0])
# Add the stamp to the image
image[y1[idx], x1[idx]] += stamp[y[idx], x[idx]]
[docs]
def create_cosmic_ray(length, width, angle, max_intensity, profile='sharp'):
"""
Create a cosmic ray track.
Cosmic rays appear as elongated tracks with sharp edges, oriented randomly.
Parameters
----------
length : float
Length of the track in pixels.
width : float
Width of the track in pixels.
angle : float
Angle in degrees (0 = horizontal, increases CCW).
max_intensity : float
Peak intensity in ADU.
profile : str, optional
Intensity profile ('sharp', 'tapered', 'worm').
Returns
-------
dict
Dictionary with 'stamp' (2D array), 'size' (stamp dimensions).
"""
# Create a horizontal track on an oversized canvas
canvas_size = int(np.ceil(max(length, width) * 3))
canvas = np.zeros((canvas_size, canvas_size))
# Center of canvas
cx, cy = canvas_size // 2, canvas_size // 2
# Create horizontal track
track_length = int(np.ceil(length))
track_width = int(np.ceil(width))
if profile == 'sharp':
# Sharp-edged rectangular track
y_start = cy - track_width // 2
y_end = y_start + track_width
x_start = cx - track_length // 2
x_end = x_start + track_length
canvas[y_start:y_end, x_start:x_end] = max_intensity
elif profile == 'tapered':
# Tapered track (intensity decreases toward ends, symmetric)
y_start = cy - track_width // 2
y_end = y_start + track_width
x_start = cx - track_length // 2
x_end = x_start + track_length
# Symmetric taper: ramps from 0.2 at edges to 1.0 at center
taper = np.linspace(-1.0, 1.0, track_length)
taper = 0.2 + 0.8 * (1.0 - np.abs(taper))
canvas[y_start:y_end, x_start:x_end] = max_intensity * taper[None, :]
elif profile == 'worm':
# Worm-like track with slight curvature
x_coords = np.arange(track_length) - track_length // 2 + cx
# Add sinusoidal variation
y_variation = np.sin(np.linspace(0, 2 * np.pi, track_length)) * (track_width * 1)
y_coords = y_variation + cy
for i, (x, y) in enumerate(zip(x_coords, y_coords)):
ix, iy = int(np.round(x)), int(np.round(y))
# Draw small blob at each position
for dy in range(-track_width // 2, -track_width // 2 + track_width):
for dx in range(-1, 2):
if i + dx < 0 or i + dx >= track_length:
continue
py, px = iy + dy, ix + dx
if 0 <= py < canvas_size and 0 <= px < canvas_size:
canvas[py, px] = max_intensity
else:
raise ValueError(f"Unknown profile '{profile}'. Use 'sharp', 'tapered', or 'worm'")
# Rotate the canvas
if angle != 0:
# Cosmics are supposed to be sharp so let's rotate without interpolation
canvas = rotate(canvas, angle, reshape=False, order=0)
# Crop to minimal bounding box
nonzero = np.nonzero(canvas > 0.01 * max_intensity)
if len(nonzero[0]) == 0:
return {'stamp': np.array([[max_intensity]]), 'size': (1, 1)}
y_min, y_max = nonzero[0].min(), nonzero[0].max()
x_min, x_max = nonzero[1].min(), nonzero[1].max()
stamp = canvas[y_min : y_max + 1, x_min : x_max + 1]
return {'stamp': stamp, 'size': stamp.shape}
[docs]
def add_cosmic_rays(
image,
n_rays=5,
length_range=(5, 50),
width_range=(1, 3),
intensity_range=(1000, 10000),
profile='sharp',
):
"""
Add cosmic ray tracks to an image.
Cosmic rays are placed at random positions with random orientations.
Parameters
----------
image : numpy.ndarray
Target image (modified in-place).
n_rays : int, optional
Number of cosmic rays to add.
length_range : tuple, optional
(min, max) length in pixels.
width_range : tuple, optional
(min, max) width in pixels.
intensity_range : tuple, optional
(min, max) peak intensity in ADU.
profile : str, optional
Intensity profile ('sharp', 'tapered', 'worm').
Returns
-------
astropy.table.Table
Catalog of added cosmic rays with columns: x, y, length, width, angle, intensity, type.
"""
height, width = image.shape
rays = []
for i in range(n_rays):
# Random parameters
length = np.random.uniform(*length_range)
ray_width = np.random.uniform(*width_range)
angle = np.random.uniform(0, 180)
intensity = np.random.uniform(*intensity_range)
# Random position
x0 = np.random.uniform(0, width)
y0 = np.random.uniform(0, height)
# Create cosmic ray stamp
cr = create_cosmic_ray(length, ray_width, angle, intensity, profile)
stamp = cr['stamp']
# Place stamp on image
ix, iy = int(np.round(x0)), int(np.round(y0))
sy, sx = stamp.shape
# Stamp bounds in image coordinates
x_start = ix - sx // 2
y_start = iy - sy // 2
# Clip to image boundaries
x_img_start = max(0, x_start)
y_img_start = max(0, y_start)
x_img_end = min(width, x_start + sx)
y_img_end = min(height, y_start + sy)
# Corresponding stamp coordinates
x_stamp_start = x_img_start - x_start
y_stamp_start = y_img_start - y_start
x_stamp_end = x_stamp_start + (x_img_end - x_img_start)
y_stamp_end = y_stamp_start + (y_img_end - y_img_start)
# Add to image
if x_img_end > x_img_start and y_img_end > y_img_start:
image[y_img_start:y_img_end, x_img_start:x_img_end] += stamp[
y_stamp_start:y_stamp_end, x_stamp_start:x_stamp_end
]
rays.append(
{
'x': x0,
'y': y0,
'length': length,
'width': ray_width,
'angle': angle,
'intensity': intensity,
'type': 'cosmic_ray',
'is_real': False,
}
)
return Table(rays)
[docs]
def add_hot_pixels(
image, n_pixels=10, intensity_range=(500, 5000), clustering=False, cluster_size=3
):
"""
Add hot pixels to an image.
Hot pixels are single bright pixels at random locations, optionally clustered
to simulate physical detector defects.
Parameters
----------
image : numpy.ndarray
Target image (modified in-place).
n_pixels : int, optional
Number of hot pixels to add.
intensity_range : tuple, optional
(min, max) intensity in ADU.
clustering : bool, optional
If True, create clusters of hot pixels.
cluster_size : int, optional
Number of pixels per cluster (if clustering=True).
Returns
-------
astropy.table.Table
Catalog of added hot pixels with columns: x, y, intensity, type.
"""
height, width = image.shape
pixels = []
if not clustering:
# Independent hot pixels
for i in range(n_pixels):
x = np.random.randint(0, width)
y = np.random.randint(0, height)
intensity = np.random.uniform(*intensity_range)
image[y, x] += intensity
pixels.append(
{'x': x, 'y': y, 'intensity': intensity, 'type': 'hot_pixel', 'is_real': False}
)
else:
# Clustered hot pixels — produce exactly n_pixels total
cluster_size = max(1, cluster_size)
n_clusters = max(1, int(np.ceil(n_pixels / cluster_size)))
placed = 0
for i in range(n_clusters):
# Cluster center
cx = np.random.randint(0, width)
cy = np.random.randint(0, height)
# Add pixels around center (stop at n_pixels total)
for j in range(cluster_size):
if placed >= n_pixels:
break
x = cx + np.random.randint(-2, 3)
y = cy + np.random.randint(-2, 3)
# Clip to image boundaries
x = np.clip(x, 0, width - 1)
y = np.clip(y, 0, height - 1)
intensity = np.random.uniform(*intensity_range)
image[y, x] += intensity
pixels.append(
{'x': x, 'y': y, 'intensity': intensity, 'type': 'hot_pixel', 'is_real': False}
)
placed += 1
return Table(pixels)
[docs]
def add_bad_columns(
image, n_columns=2, intensity_range=None, bad_type='dead', orientation='vertical'
):
"""
Add bad columns or rows to an image.
Bad columns can be dead (low/zero values), hot (high values), or noisy (high variance).
Parameters
----------
image : numpy.ndarray
Target image (modified in-place).
n_columns : int, optional
Number of bad columns/rows to add.
intensity_range : tuple, optional
(min, max) intensity for 'hot' type. Ignored for 'dead' and 'noisy'.
bad_type : str, optional
Type of bad column ('dead', 'hot', 'noisy').
orientation : str, optional
'vertical' for columns, 'horizontal' for rows.
Returns
-------
astropy.table.Table
Catalog of bad columns with columns: position, bad_type, orientation, type.
"""
height, width = image.shape
columns = []
for i in range(n_columns):
if orientation == 'vertical':
# Bad column
col = np.random.randint(0, width)
if bad_type == 'dead':
image[:, col] = 0
elif bad_type == 'hot':
if intensity_range is None:
intensity_range = (5000, 20000)
intensity = np.random.uniform(*intensity_range)
image[:, col] = intensity
elif bad_type == 'noisy':
noise = np.random.normal(0, 1000, height)
image[:, col] += noise
else:
raise ValueError(f"Unknown bad_type '{bad_type}'. Use 'dead', 'hot', or 'noisy'")
columns.append(
{
'position': col,
'bad_type': bad_type,
'orientation': orientation,
'type': 'bad_column',
'is_real': False,
}
)
elif orientation == 'horizontal':
# Bad row
row = np.random.randint(0, height)
if bad_type == 'dead':
image[row, :] = 0
elif bad_type == 'hot':
if intensity_range is None:
intensity_range = (5000, 20000)
intensity = np.random.uniform(*intensity_range)
image[row, :] = intensity
elif bad_type == 'noisy':
noise = np.random.normal(0, 1000, width)
image[row, :] += noise
else:
raise ValueError(f"Unknown bad_type '{bad_type}'. Use 'dead', 'hot', or 'noisy'")
columns.append(
{
'position': row,
'bad_type': bad_type,
'orientation': orientation,
'type': 'bad_row',
'is_real': False,
}
)
return Table(columns)
[docs]
def create_satellite_trail(length, width, intensity, angle=None, profile='linear', tumbling=False):
"""
Create a satellite trail.
Satellite trails are long, thin, bright streaks caused by satellites passing through the field.
Parameters
----------
length : float
Length of the trail in pixels.
width : float
Width of the trail in pixels.
intensity : float
Peak intensity in ADU.
angle : float, optional
Angle in degrees (0 = horizontal). If None, random.
profile : str, optional
Transverse profile ('linear', 'gaussian').
tumbling : bool, optional
If True, add intensity variations along the trail (tumbling satellite).
Returns
-------
dict
Dictionary with 'stamp' (2D array), 'angle' (actual angle used).
"""
if angle is None:
angle = np.random.uniform(0, 180)
# Create horizontal trail on oversized canvas
canvas_size = int(np.ceil(max(length, width) * 3))
canvas = np.zeros((canvas_size, canvas_size))
cx, cy = canvas_size // 2, canvas_size // 2
track_length = int(np.ceil(length))
track_width = int(np.ceil(width))
if profile == 'linear':
# Rectangular profile
y_start = cy - track_width // 2
y_end = y_start + track_width
x_start = cx - track_length // 2
x_end = x_start + track_length
canvas[y_start:y_end, x_start:x_end] = intensity
elif profile == 'gaussian':
# Gaussian transverse profile
x_coords = np.arange(cx - track_length // 2, cx + track_length // 2)
for x in x_coords:
for dy in range(-track_width * 2, track_width * 2 + 1):
y = cy + dy
if 0 <= y < canvas_size and 0 <= x < canvas_size:
gauss = np.exp(-0.5 * (dy / (track_width / 2.355)) ** 2)
canvas[y, x] += intensity * gauss
else:
raise ValueError(f"Unknown profile '{profile}'. Use 'linear' or 'gaussian'")
# Add tumbling variations
if tumbling:
x_start = cx - track_length // 2
x_end = x_start + track_length
variation = 0.5 + 0.5 * np.sin(np.linspace(0, 4 * np.pi, track_length))
y_start = cy - track_width // 2
y_end = y_start + track_width
if x_end - x_start == len(variation):
canvas[y_start:y_end, x_start:x_end] *= variation[None, :]
# Rotate
if angle != 0:
canvas = rotate(canvas, angle, reshape=False, order=1)
# Crop to bounding box
nonzero = np.nonzero(canvas > 0.01 * intensity)
if len(nonzero[0]) == 0:
return {'stamp': np.array([[intensity]]), 'angle': angle, 'size': (1, 1)}
y_min, y_max = nonzero[0].min(), nonzero[0].max()
x_min, x_max = nonzero[1].min(), nonzero[1].max()
stamp = canvas[y_min : y_max + 1, x_min : x_max + 1]
return {'stamp': stamp, 'angle': angle, 'size': stamp.shape}
[docs]
def add_satellite_trails(
image,
n_trails=1,
length_range=(100, 400),
width_range=(2, 8),
intensity_range=(5000, 20000),
profile='linear',
tumbling_prob=0.2,
):
"""
Add satellite trails to an image.
Parameters
----------
image : numpy.ndarray
Target image (modified in-place).
n_trails : int, optional
Number of satellite trails to add.
length_range : tuple, optional
(min, max) length in pixels.
width_range : tuple, optional
(min, max) width in pixels.
intensity_range : tuple, optional
(min, max) peak intensity in ADU.
profile : str, optional
Transverse profile ('linear', 'gaussian').
tumbling_prob : float, optional
Probability of tumbling satellite (intensity variations).
Returns
-------
astropy.table.Table
Catalog of added trails.
"""
height, width = image.shape
trails = []
for i in range(n_trails):
length = np.random.uniform(*length_range)
trail_width = np.random.uniform(*width_range)
intensity = np.random.uniform(*intensity_range)
tumbling = np.random.random() < tumbling_prob
# Random position
x0 = np.random.uniform(0, width)
y0 = np.random.uniform(0, height)
# Create trail
trail = create_satellite_trail(length, trail_width, intensity, None, profile, tumbling)
stamp = trail['stamp']
angle = trail['angle']
# Place on image
ix, iy = int(np.round(x0)), int(np.round(y0))
sy, sx = stamp.shape
x_start = ix - sx // 2
y_start = iy - sy // 2
x_img_start = max(0, x_start)
y_img_start = max(0, y_start)
x_img_end = min(width, x_start + sx)
y_img_end = min(height, y_start + sy)
x_stamp_start = x_img_start - x_start
y_stamp_start = y_img_start - y_start
x_stamp_end = x_stamp_start + (x_img_end - x_img_start)
y_stamp_end = y_stamp_start + (y_img_end - y_img_start)
if x_img_end > x_img_start and y_img_end > y_img_start:
image[y_img_start:y_img_end, x_img_start:x_img_end] += stamp[
y_stamp_start:y_stamp_end, x_stamp_start:x_stamp_end
]
trails.append(
{
'x': x0,
'y': y0,
'length': length,
'width': trail_width,
'angle': angle,
'intensity': intensity,
'tumbling': tumbling,
'type': 'satellite_trail',
'is_real': False,
}
)
return Table(trails)
[docs]
def create_diffraction_spikes(size, x0, y0, star_flux, n_spikes=4, spike_length=50, spike_width=2):
"""
Create diffraction spikes for a bright star.
Diffraction spikes are caused by the support structure of telescopes (refractors, SCTs).
Typically 4 spikes at 45-degree intervals.
Parameters
----------
size : int
Size of the stamp.
x0 : float
X center (star position).
y0 : float
Y center (star position).
star_flux : float
Flux of the source star (determines spike intensity).
n_spikes : int, optional
Number of spikes (typically 4).
spike_length : float, optional
Length of each spike in pixels.
spike_width : float, optional
Width of each spike in pixels.
Returns
-------
numpy.ndarray
2D array with diffraction spikes.
"""
stamp = np.zeros((size, size))
# Spike intensity proportional to star flux with power-law decline
spike_fraction = 0.01 # 1% of star flux goes into spikes
for i in range(n_spikes):
angle = i * (360.0 / n_spikes) # Evenly spaced angles
theta = np.radians(angle)
# Create spike along radial direction
for r in range(1, int(spike_length)):
# Radial power law: I(r) ∝ flux / r^2
intensity = spike_fraction * star_flux / (r**2 + 1)
# Position along spike
x = x0 + r * np.cos(theta)
y = y0 + r * np.sin(theta)
# Draw spike with finite width
# for dw in np.linspace(-spike_width / 2, spike_width / 2, int(spike_width) + 1):
for dw in np.linspace(
-np.floor(spike_width / 2), np.floor(spike_width / 2), int(2 * spike_width + 1)
):
xp = x + dw * np.sin(theta)
yp = y - dw * np.cos(theta)
ix, iy = int(np.round(xp)), int(np.round(yp))
if 0 <= ix < size and 0 <= iy < size:
stamp[iy, ix] += intensity
return stamp
[docs]
def create_optical_ghost(
size, x0, y0, source_flux, ghost_fraction=0.05, offset=(50, 50), blur_sigma=5.0
):
"""
Create an optical ghost (reflection artifact).
Optical ghosts are faint, blurred copies of bright sources caused by reflections
in the optical system.
Parameters
----------
size : int
Minimum size of the stamp (will be expanded if needed to fit ghost).
x0 : float
X center of original source within stamp.
y0 : float
Y center of original source within stamp.
source_flux : float
Flux of the original source.
ghost_fraction : float, optional
Fraction of source flux appearing in ghost (typically 0.01-0.1).
offset : tuple, optional
(dx, dy) offset of ghost from source in pixels.
blur_sigma : float, optional
Gaussian blur sigma for the ghost.
Returns
-------
dict
Dictionary with 'stamp' (2D array) and 'source_pos' (x, y) giving the
source position within the stamp (needed to align stamp onto the image).
"""
# Calculate required stamp size to accommodate both source and ghost.
# For negative offsets, the ghost lies at coordinates < source position,
# so we need to shift the origin so all coordinates are non-negative.
gx = x0 + offset[0]
gy = y0 + offset[1]
# Compute origin shift so the leftmost/topmost point is at the margin
margin = int(blur_sigma * 3) # 3-sigma margin for Gaussian blur
shift_x = max(0, margin - int(min(x0, gx)))
shift_y = max(0, margin - int(min(y0, gy)))
# Apply shift to both source and ghost coordinates
x0_s = x0 + shift_x
y0_s = y0 + shift_y
gx_s = gx + shift_x
gy_s = gy + shift_y
# Stamp must contain both positions plus margin on all sides
actual_width = max(size, int(max(x0_s, gx_s) + margin + 1))
actual_height = max(size, int(max(y0_s, gy_s) + margin + 1))
stamp = np.zeros((actual_height, actual_width))
# Place point source at ghost position
ix, iy = int(np.round(gx_s)), int(np.round(gy_s))
if 0 <= ix < actual_width and 0 <= iy < actual_height:
stamp[iy, ix] = source_flux * ghost_fraction
# Blur to simulate defocus
stamp = gaussian_filter(stamp, sigma=blur_sigma)
return {'stamp': stamp, 'source_pos': (x0_s, y0_s)}
[docs]
def add_close_companions_to_catalog(
catalog,
fwhm,
fraction=0.2,
min_separation_fwhm=1.0,
max_separation_fwhm=3.0,
flux_variation=(0.5, 1.5),
image_shape=None,
edge=10,
):
"""
Add close companions to stars in a catalog (before image placement).
This function creates a new catalog with additional companion stars placed
near existing stars to simulate crowded fields. Companions are only added
to sources with type='star'.
Parameters
----------
catalog : astropy.table.Table
Input catalog with 'x', 'y', 'flux', 'type' columns.
fwhm : float
FWHM of the PSF in pixels (used to calculate separations).
fraction : float, optional
Fraction of stars (0-1) that will have companions.
min_separation_fwhm : float, optional
Minimum separation in FWHM units.
max_separation_fwhm : float, optional
Maximum separation in FWHM units.
flux_variation : tuple, optional
(min, max) flux ratio for companion relative to primary.
image_shape : tuple, optional
(height, width) tuple for bounds checking, or None to skip.
edge : int, optional
Minimum distance from edges when bounds checking.
Returns
-------
astropy.table.Table
New catalog with companions added (original catalog unchanged).
"""
from astropy.table import vstack, Table
# Select only stars for companion addition
stars = catalog[catalog['type'] == 'star']
if len(stars) == 0 or fraction <= 0:
return catalog.copy()
# Randomly select stars to give companions
n_companions = int(len(stars) * fraction)
if n_companions == 0:
return catalog.copy()
companion_indices = np.random.choice(len(stars), size=n_companions, replace=False)
companions_added = []
min_separation = min_separation_fwhm * fwhm
max_separation = max_separation_fwhm * fwhm
for idx in companion_indices:
source = stars[idx]
# Random separation and position angle
separation = np.random.uniform(min_separation, max_separation)
angle = np.random.uniform(0, 2 * np.pi)
# Companion position
comp_x = source['x'] + separation * np.cos(angle)
comp_y = source['y'] + separation * np.sin(angle)
# Check if companion is within image bounds
if image_shape is not None:
height, width = image_shape
if not (edge < comp_x < width - edge and edge < comp_y < height - edge):
continue
# Companion flux (similar to primary, with variation)
comp_flux = source['flux'] * np.random.uniform(*flux_variation)
# Build companion entry matching the catalog structure
companion = {
'x': comp_x,
'y': comp_y,
'flux': comp_flux,
'type': 'star',
'is_real': True,
}
# Copy additional columns from source if they exist
for col in ['fwhm', 'psf_type']:
if col in source.colnames:
companion[col] = source[col]
companions_added.append(companion)
# Return extended catalog
if companions_added:
comp_table = Table(companions_added)
return vstack([catalog, comp_table])
else:
return catalog.copy()
[docs]
def add_stars(
image,
n=100,
flux_range=(100, 10000),
fwhm=3.0,
psf='gaussian',
beta=2.5,
edge=0,
saturation=None,
diffraction_spikes=False,
spike_threshold=50000,
optical_ghosts=False,
ghost_threshold=100000,
wcs=None,
):
"""
Add stars to an image with realistic PSF.
Parameters
----------
image : numpy.ndarray
Target image (modified in-place).
n : int, optional
Number of stars to add.
flux_range : tuple, optional
(min, max) flux range in ADU.
fwhm : float, optional
FWHM of the PSF in pixels (used if psf is a string).
psf : str or dict, optional
PSF specification — either a string ('gaussian', 'moffat') or a PSF model
dictionary (PSFEx format).
beta : float, optional
Moffat beta parameter (only used if psf='moffat').
edge : int, optional
Minimum distance from image edges.
saturation : float, optional
Saturation level in ADU.
diffraction_spikes : bool, optional
If True, add diffraction spikes to bright stars.
spike_threshold : float, optional
Flux threshold for adding diffraction spikes.
optical_ghosts : bool, optional
If True, add optical ghosts to very bright stars.
ghost_threshold : float, optional
Flux threshold for adding optical ghosts.
wcs : astropy.wcs.WCS, optional
WCS object for computing RA/Dec.
Returns
-------
astropy.table.Table
Catalog of added stars.
"""
from . import pipeline
from . import psf as psf_module
# Generate random star catalog
cat = pipeline.make_random_stars(
shape=image.shape,
nstars=n,
minflux=flux_range[0],
maxflux=flux_range[1],
edge=edge,
wcs=wcs,
)
# Determine PSF model
if isinstance(psf, str):
# Create oversampled PSF model once for all stars
psf_model = create_psf_model(fwhm=fwhm, psf_type=psf, beta=beta)
psf_type_name = psf
elif isinstance(psf, dict):
# User provided PSF model (e.g., from PSFEx)
psf_model = psf
psf_type_name = psf.get('psf_type', 'custom')
# Extract FWHM from model if not provided
if 'fwhm' in psf_model:
fwhm = psf_model['fwhm']
else:
raise ValueError(
"psf parameter must be either a string ('gaussian', 'moffat') or a PSF model dictionary"
)
# Add metadata to catalog
cat['psf_type'] = psf_type_name
cat['fwhm'] = fwhm
cat['type'] = 'star'
cat['is_real'] = True
# Place stars using PSFEx-compatible placement
for star in cat:
x, y, flux = star['x'], star['y'], star['flux']
# Use psf.place_psf_stamp for fast, accurate placement with sub-pixel positioning
psf_module.place_psf_stamp(image, psf_model, x, y, flux=flux)
# Add diffraction spikes if enabled and star is bright enough
if diffraction_spikes and flux > spike_threshold:
# Need to create spike stamp and add it manually
size = int(np.ceil(fwhm * 6)) * 2 + 1
ix, iy = int(np.round(x)), int(np.round(y))
dx_sub = x - ix
dy_sub = y - iy
x0_spike = size // 2 + dx_sub
y0_spike = size // 2 + dy_sub
spikes = create_diffraction_spikes(size, x0_spike, y0_spike, flux)
# Place spikes on image
y_grid, x_grid = np.mgrid[0 : spikes.shape[0], 0 : spikes.shape[1]]
y1, x1 = np.mgrid[0 : spikes.shape[0], 0 : spikes.shape[1]]
x1 += ix - np.floor(spikes.shape[1] // 2)
y1 += iy - np.floor(spikes.shape[0] // 2)
idx = (x1 >= 0) & (x1 < image.shape[1])
idx &= (y1 >= 0) & (y1 < image.shape[0])
image[y1[idx].astype(int), x1[idx].astype(int)] += spikes[y_grid[idx], x_grid[idx]]
# Add optical ghost if enabled and star is very bright
if optical_ghosts and flux > ghost_threshold:
size = int(np.ceil(fwhm * 6)) * 2 + 1
ix, iy = int(np.round(x)), int(np.round(y))
dx_sub = x - ix
dy_sub = y - iy
x0_ghost = size // 2 + dx_sub
y0_ghost = size // 2 + dy_sub
ghost_result = create_optical_ghost(size, x0_ghost, y0_ghost, flux)
ghost = ghost_result['stamp']
src_x, src_y = ghost_result['source_pos']
# Place ghost on image
# The source position within the stamp (src_x, src_y) should align
# with the star position in the image (ix, iy)
y_grid, x_grid = np.mgrid[0 : ghost.shape[0], 0 : ghost.shape[1]]
y1, x1 = np.mgrid[0 : ghost.shape[0], 0 : ghost.shape[1]]
# Offset so that stamp position (src_x, src_y) maps to image position (ix, iy)
x1 += ix - int(np.round(src_x))
y1 += iy - int(np.round(src_y))
idx = (x1 >= 0) & (x1 < image.shape[1])
idx &= (y1 >= 0) & (y1 < image.shape[0])
image[y1[idx].astype(int), x1[idx].astype(int)] += ghost[y_grid[idx], x_grid[idx]]
# Apply saturation
if saturation is not None:
image[image > saturation] = saturation
return cat
[docs]
def add_galaxies(
image,
n=20,
flux_range=(500, 5000),
r_eff_range=(3, 15),
n_range=(0.5, 4.0),
ellipticity_range=(0.0, 0.7),
edge=0,
wcs=None,
):
"""
Add galaxies with Sersic profiles to an image.
Parameters
----------
image : numpy.ndarray
Target image (modified in-place).
n : int, optional
Number of galaxies to add.
flux_range : tuple, optional
(min, max) total flux in ADU.
r_eff_range : tuple, optional
(min, max) effective radius in pixels.
n_range : tuple, optional
(min, max) Sersic index.
ellipticity_range : tuple, optional
(min, max) ellipticity.
edge : int, optional
Minimum distance from image edges.
wcs : astropy.wcs.WCS, optional
WCS object for computing RA/Dec.
Returns
-------
astropy.table.Table
Catalog of added galaxies.
"""
height, width = image.shape
galaxies = []
for i in range(n):
# Random parameters
x = np.random.uniform(edge, width - 1 - edge)
y = np.random.uniform(edge, height - 1 - edge)
flux = 10 ** np.random.uniform(np.log10(flux_range[0]), np.log10(flux_range[1]))
r_eff = np.random.uniform(*r_eff_range)
sersic_n = np.random.uniform(*n_range)
ellipticity = np.random.uniform(*ellipticity_range)
position_angle = np.random.uniform(0, 180)
# Place galaxy
place_galaxy(image, x, y, flux, r_eff, sersic_n, ellipticity, position_angle)
# Catalog entry
galaxy = {
'x': x,
'y': y,
'flux': flux,
'r_eff': r_eff,
'sersic_n': sersic_n,
'ellipticity': ellipticity,
'position_angle': position_angle,
'type': 'galaxy',
'is_real': True,
}
if wcs is not None and wcs.celestial:
galaxy['ra'], galaxy['dec'] = wcs.all_pix2world(x, y, 0)
else:
galaxy['ra'], galaxy['dec'] = np.nan, np.nan
galaxies.append(galaxy)
return Table(galaxies)
[docs]
def simulate_image(
width,
height,
n_stars=100,
star_flux_range=(100, 10000),
star_fwhm=3.0,
star_psf='gaussian',
star_beta=2.5,
n_galaxies=20,
galaxy_flux_range=(500, 5000),
galaxy_r_eff_range=(3, 15),
galaxy_n_range=(0.5, 4.0),
galaxy_ellipticity_range=(0.0, 0.7),
n_cosmic_rays=5,
cosmic_ray_length_range=(5, 50),
cosmic_ray_width_range=(1, 3),
cosmic_ray_intensity_range=(1000, 10000),
cosmic_ray_profile='worm',
n_hot_pixels=10,
hot_pixel_intensity_range=(500, 5000),
n_bad_columns=0,
bad_column_type='dead',
n_satellites=0,
satellite_length_range=(100, 400),
satellite_width_range=(2, 8),
satellite_intensity_range=(5000, 20000),
diffraction_spikes=False,
spike_threshold=50000,
optical_ghosts=False,
ghost_threshold=100000,
add_companions=False,
companion_fraction=0.2,
companion_separation_fwhm=(1.0, 3.0),
companion_flux_ratio=(0.5, 1.5),
background=1000.0,
readnoise=10.0,
gain=1.0,
edge=10,
wcs=None,
return_catalog=True,
return_masks=False,
seed=None,
verbose=False,
):
"""
Simulate a realistic astronomical image with stars, galaxies, and contaminants.
This is the high-level API that creates a complete simulated image in one call.
Parameters
----------
width : int, optional
Image width in pixels.
height : int, optional
Image height in pixels.
n_stars : int, optional
Number of stars to add.
star_flux_range : tuple, optional
(min, max) star flux in ADU.
star_fwhm : float, optional
FWHM of stellar PSF in pixels.
star_psf : str or dict, optional
PSF type — either a string ('gaussian', 'moffat') or a PSF model dict
(from create_psf_model). Use dict to specify aberrated PSFs.
star_beta : float, optional
Moffat beta parameter (if star_psf='moffat').
n_galaxies : int, optional
Number of galaxies to add.
galaxy_flux_range : tuple, optional
(min, max) galaxy flux in ADU.
galaxy_r_eff_range : tuple, optional
(min, max) effective radius in pixels.
galaxy_n_range : tuple, optional
(min, max) Sersic index.
galaxy_ellipticity_range : tuple, optional
(min, max) ellipticity.
n_cosmic_rays : int, optional
Number of cosmic ray tracks.
cosmic_ray_length_range : tuple, optional
(min, max) cosmic ray length in pixels.
cosmic_ray_width_range : tuple, optional
(min, max) cosmic ray width in pixels.
cosmic_ray_intensity_range : tuple, optional
(min, max) cosmic ray peak intensity.
cosmic_ray_profile : str, optional
Cosmic ray profile ('sharp', 'tapered', 'worm').
n_hot_pixels : int, optional
Number of hot pixels.
hot_pixel_intensity_range : tuple, optional
(min, max) hot pixel intensity.
n_bad_columns : int, optional
Number of bad columns.
bad_column_type : str, optional
Type of bad columns ('dead', 'hot', 'noisy').
n_satellites : int, optional
Number of satellite trails.
satellite_length_range : tuple, optional
(min, max) satellite trail length in pixels.
satellite_width_range : tuple, optional
(min, max) satellite trail width in pixels.
satellite_intensity_range : tuple, optional
(min, max) satellite trail intensity.
diffraction_spikes : bool, optional
Add diffraction spikes to bright stars.
spike_threshold : float, optional
Flux threshold for diffraction spikes.
optical_ghosts : bool, optional
Add optical ghosts to very bright stars.
ghost_threshold : float, optional
Flux threshold for optical ghosts.
add_companions : bool, optional
If True, add close stellar companions to simulate crowded fields.
companion_fraction : float, optional
Fraction of stars (0-1) that will have companions.
companion_separation_fwhm : tuple, optional
(min, max) companion separation in FWHM units.
companion_flux_ratio : tuple, optional
(min, max) companion flux relative to primary star.
background : float, optional
Background level in ADU.
readnoise : float, optional
Read noise in ADU.
gain : float, optional
Detector gain in e-/ADU.
edge : int, optional
Minimum distance from image edges for sources.
wcs : astropy.wcs.WCS, optional
WCS object for computing sky coordinates.
return_catalog : bool, optional
If True, return catalog of all injected sources.
return_masks : bool, optional
If True, return separate masks for each artifact type.
seed : int, optional
Random seed for reproducibility. If set, calls np.random.seed(seed).
verbose : bool, optional
Enable verbose output.
Returns
-------
dict
Dictionary with 'image', 'catalog' (if requested), 'masks' (if requested),
'background', 'noise'.
"""
log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None
if seed is not None:
np.random.seed(seed)
log(f"Simulating {width}x{height} image with {n_stars} stars and {n_galaxies} galaxies")
# Initialize image with background
image = np.ones((height, width), dtype=float) * background
# Collect all catalogs
catalogs = []
# Add stars
if n_stars > 0:
log(f"Adding {n_stars} stars...")
# Resolve effective FWHM: if star_psf is a dict with its own fwhm, use that
effective_fwhm = star_fwhm
if isinstance(star_psf, dict) and 'fwhm' in star_psf:
effective_fwhm = star_psf['fwhm']
# Add companions to catalog before placement if requested
if add_companions:
# First add the base stars
cat_stars = add_stars(
image,
n=n_stars,
flux_range=star_flux_range,
fwhm=star_fwhm,
psf=star_psf,
beta=star_beta,
edge=edge,
diffraction_spikes=diffraction_spikes,
spike_threshold=spike_threshold,
optical_ghosts=optical_ghosts,
ghost_threshold=ghost_threshold,
wcs=wcs,
)
# Get number of companions before adding
n_before = len(cat_stars)
# Add companions to catalog
cat_stars_with_companions = add_close_companions_to_catalog(
cat_stars,
fwhm=effective_fwhm,
fraction=companion_fraction,
min_separation_fwhm=companion_separation_fwhm[0],
max_separation_fwhm=companion_separation_fwhm[1],
flux_variation=companion_flux_ratio,
image_shape=(height, width),
edge=edge,
)
n_companions = len(cat_stars_with_companions) - n_before
if n_companions > 0:
log(f" Added {n_companions} close companions")
# Place only the companion stars (primaries already placed)
# Determine PSF model for companions
if isinstance(star_psf, str):
psf_model = create_psf_model(fwhm=star_fwhm, psf_type=star_psf, beta=star_beta)
else:
psf_model = star_psf
# Place companions
from . import psf as psf_module
for companion in cat_stars_with_companions[n_before:]:
psf_module.place_psf_stamp(
image, psf_model, companion['x'], companion['y'], flux=companion['flux']
)
cat_stars = cat_stars_with_companions
else:
# No companions - use standard add_stars
cat_stars = add_stars(
image,
n=n_stars,
flux_range=star_flux_range,
fwhm=star_fwhm,
psf=star_psf,
beta=star_beta,
edge=edge,
diffraction_spikes=diffraction_spikes,
spike_threshold=spike_threshold,
optical_ghosts=optical_ghosts,
ghost_threshold=ghost_threshold,
wcs=wcs,
)
catalogs.append(cat_stars)
# Add galaxies
if n_galaxies > 0:
log(f"Adding {n_galaxies} galaxies...")
cat_galaxies = add_galaxies(
image,
n=n_galaxies,
flux_range=galaxy_flux_range,
r_eff_range=galaxy_r_eff_range,
n_range=galaxy_n_range,
ellipticity_range=galaxy_ellipticity_range,
edge=edge,
wcs=wcs,
)
catalogs.append(cat_galaxies)
# Add cosmic rays
if n_cosmic_rays > 0:
log(f"Adding {n_cosmic_rays} cosmic rays...")
cat_cosmic = add_cosmic_rays(
image,
n_rays=n_cosmic_rays,
length_range=cosmic_ray_length_range,
width_range=cosmic_ray_width_range,
intensity_range=cosmic_ray_intensity_range,
profile=cosmic_ray_profile,
)
catalogs.append(cat_cosmic)
# Add hot pixels
if n_hot_pixels > 0:
log(f"Adding {n_hot_pixels} hot pixels...")
cat_hot = add_hot_pixels(
image, n_pixels=n_hot_pixels, intensity_range=hot_pixel_intensity_range
)
catalogs.append(cat_hot)
# Add bad columns
if n_bad_columns > 0:
log(f"Adding {n_bad_columns} bad columns...")
cat_bad = add_bad_columns(image, n_columns=n_bad_columns, bad_type=bad_column_type)
catalogs.append(cat_bad)
# Add satellite trails
if n_satellites > 0:
log(f"Adding {n_satellites} satellite trails...")
cat_satellites = add_satellite_trails(
image,
n_trails=n_satellites,
length_range=satellite_length_range,
width_range=satellite_width_range,
intensity_range=satellite_intensity_range,
)
catalogs.append(cat_satellites)
# Apply Poisson noise to entire image (sources + background) in one shot.
# Physically correct: noise from photon counting on total signal per pixel.
log("Adding noise...")
if gain is not None and gain > 0:
image_e = image * gain
idx = image_e > 0
image_e[idx] = np.random.poisson(image_e[idx].astype(np.float64))
image = image_e / gain
# Add readnoise (Gaussian, after Poisson — correct physical order)
noise_map = np.random.normal(0, readnoise, (height, width))
image += noise_map
# Prepare output
result = {
'image': image,
'background': np.ones((height, width)) * background,
'noise': noise_map,
}
# Combine catalogs
if return_catalog and len(catalogs) > 0:
from astropy.table import vstack
# Stack all catalogs
combined_cat = vstack(catalogs, join_type='outer')
result['catalog'] = combined_cat
if return_masks:
# Create separate masks for each artifact type
# This would require tracking which pixels were modified by each artifact
# For now, return empty dict (can be implemented later if needed)
result['masks'] = {}
log("Simulation complete")
return result
[docs]
def generate_realbogus_training_data(
n_images=100,
image_size=(2048, 2048),
n_stars_range=(50, 200),
n_galaxies_range=(10, 50),
fwhm_range=(1.5, 8.0),
background_range=(100, 10000),
n_cosmic_rays_range=(5, 20),
n_hot_pixels_range=(5, 30),
n_satellites_range=(0, 2),
detection_threshold=3.0,
match_radius=3.0,
cutout_radius=15,
augment=True,
real_source_types=['star'],
close_pair_fraction=0.2,
min_separation_fwhm=1.0,
max_separation_fwhm=3.0,
aberration_fraction=0.0,
defocus_range=(0.0, 2.0),
astigmatism_range=(0.0, 1.5),
coma_range=(0.0, 1.0),
readnoise_range=(5.0, 20.0),
gain_range=(0.5, 2.0),
asinh_softening=None,
seed=None,
verbose=False,
):
"""
Generate labeled training data for real-bogus classifier.
This function creates a dataset of labeled cutouts by:
1. Simulating images with known source positions
2. Running object detection
3. Matching detections to truth catalog
4. Labeling matched detections as 'real', others as 'bogus'
5. Extracting and preprocessing cutouts
6. Optionally applying data augmentation
Flux ranges are automatically calculated for each image based on the background
noise level to ensure sources are detectable:
- Minimum flux = detection_threshold × noise (e.g., 3σ for detectability)
- Maximum flux = 1000 × noise (bright sources)
This prevents generating sources that are too faint to detect or unrealistically bright.
Close pairs are automatically added to ensure the classifier learns to accept
crowded sources. A fraction of stars will have stellar companions added at
controlled separations (e.g., 1-3 FWHM). Companions are only added to stars,
not galaxies.
PSF diversity can be included in training data by setting aberration_fraction > 0.
A fraction of images will use aberrated PSFs with randomized Zernike coefficients
(defocus, astigmatism, coma) computed via Fourier optics, helping the classifier
handle realistic PSF variations from optical aberrations.
Parameters
----------
n_images : int, optional
Number of simulated images to generate.
image_size : tuple, optional
(width, height) of simulated images.
n_stars_range : tuple, optional
(min, max) number of stars per image.
n_galaxies_range : tuple, optional
(min, max) number of galaxies per image.
fwhm_range : tuple, optional
(min, max) FWHM in pixels (varied per image).
background_range : tuple, optional
(min, max) background level in ADU.
n_cosmic_rays_range : tuple, optional
(min, max) cosmic rays per image.
n_hot_pixels_range : tuple, optional
(min, max) hot pixels per image.
n_satellites_range : tuple, optional
(min, max) satellite trails per image.
detection_threshold : float, optional
Detection threshold in sigma (also used for min flux calculation).
match_radius : float, optional
Matching radius in pixels for truth matching.
cutout_radius : int, optional
Cutout radius in pixels.
augment : bool, optional
Apply data augmentation (rotations, flips).
real_source_types : list, optional
List of source types to consider 'real'. Default: ['star'] treats only stars
as real and galaxies as bogus. Use ['star', 'galaxy'] to treat both as real.
close_pair_fraction : float, optional
Fraction of sources (0-1) that will have close companions added.
Default: 0.2 (20% of sources get companions).
min_separation_fwhm : float, optional
Minimum separation for companions in FWHM units. Default: 1.0 (1x FWHM).
max_separation_fwhm : float, optional
Maximum separation for companions in FWHM units. Default: 3.0 (3x FWHM).
aberration_fraction : float, optional
Fraction of images (0-1) with aberrated PSFs. Default: 0.0 (all Gaussian).
When > 0 and aberration ranges have non-zero max values, uses Fourier optics
with Zernike polynomials. Otherwise falls back to Moffat PSFs as a simpler proxy.
defocus_range : tuple, optional
(min, max) defocus aberration in waves (default: (0.0, 2.0)).
astigmatism_range : tuple, optional
(min, max) astigmatism aberration in waves (default: (0.0, 1.5)).
coma_range : tuple, optional
(min, max) coma aberration in waves (default: (0.0, 1.0)).
readnoise_range : tuple, optional
(min, max) read noise in ADU (randomized per image).
gain_range : tuple, optional
(min, max) detector gain in e-/ADU (randomized per image).
asinh_softening : float, optional
Asinh softening in units of background sigma for realbogus preprocessing.
If None, uses DEFAULT_ASINH_SOFTENING_SIGMA.
seed : int, optional
Random seed for reproducibility. If set, calls np.random.seed(seed).
verbose : bool, optional
Print progress.
Returns
-------
dict
Dictionary with 'X' (cutouts), 'y' (labels), 'fwhm' (FWHM values), 'metadata'.
"""
from . import photometry
from . import realbogus
log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None
if seed is not None:
np.random.seed(seed)
log(f"Generating training data from {n_images} simulated images...")
width, height = image_size
# Containers for training data
all_cutouts = []
all_labels = []
all_metadata = []
for img_idx in range(n_images):
# Randomize parameters for diversity
fwhm = np.random.uniform(*fwhm_range)
background = 10 ** np.random.uniform(
np.log10(background_range[0]), np.log10(background_range[1])
)
n_stars = np.random.randint(*n_stars_range)
n_galaxies = np.random.randint(*n_galaxies_range)
n_cosmic_rays = np.random.randint(*n_cosmic_rays_range)
n_hot_pixels = np.random.randint(*n_hot_pixels_range)
n_satellites = np.random.randint(*n_satellites_range) if n_satellites_range[1] > 0 else 0
# Randomize readnoise and gain for diversity
readnoise = np.random.uniform(*readnoise_range)
gain = np.random.uniform(*gain_range)
aperture_radius = fwhm
n_pix = np.pi * aperture_radius**2
# Noise in aperture (photon noise + readnoise)
noise_aperture = np.sqrt(n_pix * (background / gain + readnoise**2))
# Set flux ranges based on S/N ratio
# min_flux: detectable at threshold (e.g., 3 sigma)
# max_flux: very bright sources (1000 sigma)
min_sn = detection_threshold
max_sn = 1000.0
star_flux_range = (min_sn * noise_aperture, max_sn * noise_aperture)
galaxy_flux_range = (min_sn * noise_aperture, max_sn * noise_aperture)
# Optionally use aberrated PSF for this image to add diversity
psf_type_str = 'gaussian'
has_aberration_ranges = (
defocus_range[1] > 0 or astigmatism_range[1] > 0 or coma_range[1] > 0
)
if aberration_fraction > 0 and np.random.random() < aberration_fraction:
if has_aberration_ranges:
# Use Fourier optics aberrations with random Zernike coefficients
defocus = np.random.uniform(*defocus_range) if defocus_range[1] > 0 else 0.0
astig_mag = (
np.random.uniform(*astigmatism_range) if astigmatism_range[1] > 0 else 0.0
)
coma_mag = np.random.uniform(*coma_range) if coma_range[1] > 0 else 0.0
# Random orientation for directional aberrations
angle = np.random.uniform(0, 2 * np.pi)
astigmatism_x = astig_mag * np.cos(angle)
astigmatism_y = astig_mag * np.sin(angle)
coma_x = coma_mag * np.cos(angle)
coma_y = coma_mag * np.sin(angle)
star_psf = create_psf_model(
fwhm=fwhm,
psf_type='gaussian',
defocus=defocus,
astigmatism_x=astigmatism_x,
astigmatism_y=astigmatism_y,
coma_x=coma_x,
coma_y=coma_y,
)
psf_type_str = (
f'aberrated(def={defocus:.1f},ast={astig_mag:.1f},coma={coma_mag:.1f})'
)
else:
# Fall back to Moffat PSF as simpler proxy for non-ideal optics
beta = np.random.uniform(1.5, 4.5)
star_psf = create_psf_model(fwhm=fwhm, psf_type='moffat', beta=beta)
psf_type_str = f'moffat(beta={beta:.1f})'
else:
# Use standard Gaussian PSF
star_psf = 'gaussian'
log(
f"Image {img_idx + 1}/{n_images}: FWHM={fwhm:.2f}, BG={background:.1f}, "
f"stars={n_stars}, gal={n_galaxies}, CR={n_cosmic_rays}, hot={n_hot_pixels}, "
f"flux_range={star_flux_range[0]:.0f}-{star_flux_range[1]:.0f}, PSF={psf_type_str}"
)
# Simulate image with companions (cleaner, unified approach)
sim = simulate_image(
width=width,
height=height,
n_stars=n_stars,
star_flux_range=star_flux_range,
star_fwhm=fwhm,
star_psf=star_psf, # Can be string or PSF model dict
n_galaxies=n_galaxies,
galaxy_flux_range=galaxy_flux_range,
n_cosmic_rays=n_cosmic_rays,
n_hot_pixels=n_hot_pixels,
n_satellites=n_satellites,
add_companions=close_pair_fraction > 0,
companion_fraction=close_pair_fraction,
companion_separation_fwhm=(min_separation_fwhm, max_separation_fwhm),
companion_flux_ratio=(0.5, 1.5),
background=background,
readnoise=readnoise,
gain=gain,
return_catalog=True,
verbose=False,
)
image = sim['image']
truth_catalog = sim['catalog']
# Detect objects
try:
detected = photometry.get_objects_sep(
image,
thresh=detection_threshold,
aper=fwhm,
minarea=5,
verbose=False,
)
except Exception as e:
log(f"Detection failed for image {img_idx + 1}: {e}")
continue
if len(detected) == 0:
log(f"No detections in image {img_idx + 1}, skipping")
continue
# Match detections to truth catalog
# Real sources: determined by real_source_types parameter
# Artifacts: cosmic rays, hot pixels, satellites (type='cosmic_ray', etc.)
# Build filter for real sources based on real_source_types
real_filter = np.zeros(len(truth_catalog), dtype=bool)
for source_type in real_source_types:
real_filter |= truth_catalog['type'] == source_type
truth_real = truth_catalog[real_filter]
# Simple distance-based matching
matched_indices = np.full(len(detected), -1, dtype=int)
for i, det in enumerate(detected):
dx = truth_real['x'] - det['x']
dy = truth_real['y'] - det['y']
dist = np.sqrt(dx**2 + dy**2)
if len(dist) > 0 and np.min(dist) < match_radius:
matched_indices[i] = 1 # Real source
else:
matched_indices[i] = 0 # Bogus (artifact or spurious)
# Count matches
n_real = np.sum(matched_indices == 1)
n_bogus = np.sum(matched_indices == 0)
log(f" Matched: {n_real} real, {n_bogus} bogus from {len(detected)} detections")
if n_real == 0 and n_bogus == 0:
continue
# Extract cutouts
try:
cutouts, valid_indices = realbogus.extract_cutouts(
detected,
image,
bg=background,
radius=cutout_radius,
fwhm=fwhm,
asinh_softening=asinh_softening,
verbose=False,
)
except Exception as e:
log(f"Cutout extraction failed for image {img_idx + 1}: {e}")
continue
# Get labels for valid cutouts
labels = matched_indices[valid_indices]
# Store data
all_cutouts.append(cutouts)
all_labels.append(labels)
# Metadata for each cutout
metadata = {
'image_idx': np.full(len(cutouts), img_idx),
'fwhm': np.full(len(cutouts), fwhm),
'background': np.full(len(cutouts), background),
}
all_metadata.append(metadata)
# Concatenate all data
if len(all_cutouts) == 0:
raise ValueError("No valid training data generated")
X = np.concatenate(all_cutouts, axis=0)
y = np.concatenate(all_labels, axis=0)
log(f"Total samples: {len(X)} ({np.sum(y)} real, {len(y) - np.sum(y)} bogus)")
# Apply data augmentation
if augment:
log("Applying data augmentation...")
X_aug, y_aug = _augment_training_data(X, y, verbose=verbose)
log(f"After augmentation: {len(X_aug)} samples")
else:
X_aug = X
y_aug = y
# Convert labels to binary (ensure proper dtype)
y_aug = y_aug.astype(np.float32)
result = {
'X': X_aug,
'y': y_aug,
'metadata': all_metadata,
}
return result
def _augment_training_data(X, y, augment_factor=8, verbose=False):
"""
Apply data augmentation to training cutouts.
Augmentation strategies:
- Rotations: 0°, 90°, 180°, 270°
- Flips: horizontal, vertical
- Combined: rotations + flips (~8-16× augmentation)
Parameters
----------
X : numpy.ndarray
Input cutouts (N, H, W, C).
y : numpy.ndarray
Labels (N,).
augment_factor : int, optional
Target augmentation factor.
verbose : bool, optional
Print progress.
Returns
-------
tuple
(X_aug, y_aug) with augmented data.
"""
log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None
n_samples = len(X)
X_list = [X]
y_list = [y]
# Rotation augmentation (90°, 180°, 270°)
for angle in [90, 180, 270]:
X_list.append(np.rot90(X, k=angle // 90, axes=(1, 2)))
y_list.append(y)
# Flip augmentation
if augment_factor >= 8:
X_list.append(np.flip(X, axis=2)) # Horizontal flip
y_list.append(y)
X_list.append(np.flip(X, axis=1)) # Vertical flip
y_list.append(y)
X_list.append(np.flip(np.flip(X, axis=1), axis=2)) # Both flips
y_list.append(y)
X_aug = np.concatenate(X_list, axis=0)
y_aug = np.concatenate(y_list, axis=0)
log(
f"Augmentation: {n_samples} → {len(X_aug)} samples ({len(X_aug) / n_samples:.1f}× increase)"
)
return X_aug, y_aug