Source code for stdpipe.lcs

import sys
import numpy as np
from astropy.stats import mad_std
from astropy.time import Time

from scipy.spatial import cKDTree

from . import astrometry


# Fast-conversion registry for types whose element-by-element iteration is
# catastrophically slow (e.g. astropy.time.Time, where list.extend() on a
# ~3.5 M-row column boxes each scalar individually and takes ~20 s).
#
# Each handler is a callable ``handler(val) -> (storage_array, finalizer)``:
#   * ``storage_array`` is an ndarray of fast-iterating scalars (typically
#     float64) that is appended to the per-key Python list.
#   * ``finalizer`` is called once during :meth:`LCs.cluster` consolidation
#     as ``finalizer(ndarray) -> typed_object`` to rebuild the original
#     container from the accumulated storage values.
#
# To add support for another slow-iterating type, append a (predicate,
# handler) tuple to ``_FAST_TYPE_HANDLERS``.


def _time_handler(val):
    """Round-trip Time through MJD floats (~150 ms for 3.5 M rows)."""
    scale, fmt = val.scale, val.format

    def finalize(arr):
        out = Time(arr, format='mjd', scale=scale)
        if fmt != 'mjd':
            out.format = fmt
        return out

    return val.mjd, finalize


_FAST_TYPE_HANDLERS = [
    (lambda v: isinstance(v, Time), _time_handler),
]


def _fast_storage_handler(val):
    for predicate, handler in _FAST_TYPE_HANDLERS:
        if predicate(val):
            return handler
    return None


[docs] class LCs: """ Container for light-curve data vectors with spatial clustering utilities. Stores user-provided per-detection vectors (e.g., ra/dec/flux/time) and groups detections into spatial clusters using a KDTree radius search. Clustering returns per-cluster centroids and member indices in `self.lcs`. Notes ----- - `add()` broadcasts scalars to the length of the longest input vector. - `cluster()` refines centroids and can call an `analyze(self, ids)` callback per cluster. - Coordinate jitter is applied when building the KDTree to avoid degeneracy from repeated positions. - Clustering results are stored in `self.lcs` with keys: - `x`, `y`, `z`: centroid unit-vector coordinates. - `ra`, `dec`: centroid sky coordinates in degrees. - `N`: number of points per cluster. - `ids`: list of index arrays for member points in the container. - `kd`: KDTree built from centroid vectors for fast queries. """ def __init__(self): # Storage for user-supplied data vectors self._params = {} # Per-key finalizer callables for keys whose values were stored via # the fast-conversion path (see ``_FAST_TYPE_HANDLERS``). Applied # during :meth:`cluster` consolidation to restore the original type. self._finalizers = {} self.lcs = None self.kd = None def __getattr__(self, name): # Allows direct access to stored data vectors if name in self._params: return self._params.get(name) else: raise AttributeError def __dir__(self): # For auto-completion of stored data vectors names return ( list(self.__dict__.keys()) + list(self.__class__.__dict__.keys()) + list(self._params.keys()) )
[docs] def add(self, **kwargs): """ Add per-detection vectors to the container. Each keyword defines a stored vector. Scalars are broadcast to the length of the longest input vector, and missing values are filled with None. This method may be called repeatedly to append new chunks of measurements (e.g., per-image batches) to the existing vectors. Examples -------- >>> lcs = LCs() >>> lcs.add(ra=[1, 2], dec=[3, 4], flux=10.0) """ def extend(col, val, length, key): if ( val is not None and hasattr(val, "__len__") and not isinstance(val, str) and len(val) == length ): # Some array-likes (notably astropy.time.Time) iterate so # slowly element-by-element that ``list.extend(val)`` becomes # the bottleneck. Route them through a fast bulk-conversion # path; the original type is restored during ``cluster()``. handler = _fast_storage_handler(val) if handler is not None: storage_arr, finalizer = handler(val) col.extend(storage_arr.tolist()) self._finalizers.setdefault(key, finalizer) elif hasattr(val, 'tolist'): # ndarray / astropy.table.Column: bulk C-level conversion # avoids the ~10x slowdown of per-element iteration in # ``list.extend(Column)``. col.extend(val.tolist()) else: col.extend(val) elif val is not None: col.extend(np.repeat(val, length)) else: col.extend(np.repeat(None, length)) # Estimate vector length from input data - for now, just as a max length of individual values length = 0 for _ in kwargs: if hasattr(kwargs[_], '__len__') and not isinstance(kwargs[_], str): length = max(length, len(kwargs[_])) for key in kwargs: if key not in self._params: self._params[key] = [] extend(self._params[key], kwargs[key], length, key)
[docs] def cluster( self, sr=1 / 3600, min_length=None, col_ra='ra', col_dec='dec', verbose=True, analyze=None, N=1000, max_refine_iter=1, ): """ Spatially cluster the data vectors using ra/dec values stored in `col_ra` and `col_dec`. Parameters ---------- sr : float, optional Clustering radius in degrees. min_length : int or None, optional Minimum number of points required to keep a cluster. col_ra : str, optional Name of the RA column in stored vectors. col_dec : str, optional Name of the Dec column in stored vectors. verbose : bool or callable, optional Logging control, can be a print-like function. analyze : callable or None, optional Optional callback `analyze(self, ids)` called per accepted cluster. Any returned mapping entries are appended into `self.lcs` under their respective keys (one entry per cluster). N : int, optional Progress update interval in points. max_refine_iter : int, optional Maximum number of centroid refinement iterations (default 1). """ log = (verbose if callable(verbose) else print) if verbose else lambda *args, **kwargs: None if min_length is None: min_length = 0 sr0 = np.deg2rad(sr) if type(self._params[col_ra]) is not np.ndarray: log('Converting arrays') for name in self._params: arr = np.array(self._params[name]) finalizer = self._finalizers.get(name) if finalizer is not None: arr = finalizer(arr) self._params[name] = arr self._xarr, self._yarr, self._zarr = astrometry.radectoxyz( self._params[col_ra], self._params[col_dec] ) # Add some additional jitter to coordinates, or KDTree may hang on repeating positions self._xarr = np.random.normal(self._xarr, 0.01 / 206265) self._yarr = np.random.normal(self._yarr, 0.01 / 206265) self._zarr = np.random.normal(self._zarr, 0.01 / 206265) self.kd = cKDTree(np.array([self._xarr, self._yarr, self._zarr]).T) def refine_pos(x, y, z): """Returns mean position for a list of individual positions""" x1, y1, z1 = [np.mean(_) for _ in [x, y, z]] # Normalize back to unit sphere r = np.sqrt(x1 * x1 + y1 * y1 + z1 * z1) x1, y1, z1 = [_ / r for _ in [x1, y1, z1]] return x1, y1, z1 xarr = self._xarr yarr = self._yarr zarr = self._zarr kd = self.kd vmask = np.zeros_like(self._params[col_ra], bool) self.lcs = {'x': [], 'y': [], 'z': [], 'N': [], 'ids': []} lcs_x = self.lcs['x'] lcs_y = self.lcs['y'] lcs_z = self.lcs['z'] lcs_n = self.lcs['N'] lcs_ids = self.lcs['ids'] log( 'Starting spatial clustering of %d points with %.1f arcsec radius' % (len(vmask), sr * 3600) ) def process_seed(i): # Select points around seed position ids = kd.query_ball_point([xarr[i], yarr[i], zarr[i]], sr0) if len(ids) < min_length: vmask[ids] = True return x1, y1, z1 = refine_pos(xarr[ids], yarr[ids], zarr[ids]) ids = kd.query_ball_point([x1, y1, z1], sr0) for _ in range(max_refine_iter - 1): x2, y2, z2 = refine_pos(xarr[ids], yarr[ids], zarr[ids]) ids2 = kd.query_ball_point([x2, y2, z2], sr0) if len(ids2) == len(ids) and np.all(np.in1d(ids2, ids)): x1, y1, z1 = x2, y2, z2 break ids = ids2 x1, y1, z1 = x2, y2, z2 vmask[ids] = True # Mask all points around mean position if len(ids) >= min_length: # Actual processing of points lcs_x.append(x1) lcs_y.append(y1) lcs_z.append(z1) lcs_n.append(len(ids)) lcs_ids.append(ids) if analyze is not None and callable(analyze): ares = analyze(self, ids) for _, __ in ares.items(): if _ not in self.lcs: self.lcs[_] = [] self.lcs[_].append(__) for i in range(len(vmask)): if not vmask[i]: process_seed(i) if i % N == 0 and verbose == True: sys.stdout.write("\r %d points - %d lcs" % (i, len(self.lcs['x']))) sys.stdout.flush() if verbose == True: sys.stdout.write("\n") sys.stdout.flush() for _ in self.lcs.keys(): if isinstance(self.lcs[_], list) and _ not in ['ids']: self.lcs[_] = np.array(self.lcs[_]) self.lcs['ra'], self.lcs['dec'] = astrometry.xyztoradec( [self.lcs['x'], self.lcs['y'], self.lcs['z']] ) self.lcs['kd'] = cKDTree(np.array([self.lcs['x'], self.lcs['y'], self.lcs['z']]).T) log('%d spatial clusters isolated' % len(self.lcs['ra']))