import os
import datetime
import copy
import sys
import traceback
import warnings
from packaging.version import Version
from collections import OrderedDict
import numpy as np
from scipy import ndimage
from scipy.signal import fftconvolve
import astropy
from astropy.io import fits
from astropy.table import Table
from stsci.tools.bitmask import bitfield_to_boolean_mask
from astropy.coordinates import SkyCoord, Angle
from astropy import units as u
from astropy.modeling import models, fitting
from photutils import background, DAOStarFinder
from photutils.background import Background2D
from photutils.utils import NoDetectionsWarning
import stwcs
from stwcs.wcsutil import HSTWCS
from stwcs.wcsutil import headerlet
import stsci.tools
from stsci.tools import logutil
from stsci.tools import fileutil
from .. import updatehdr
from . import astrometric_utils as amutils
from . import analyze
from . import deconvolve_utils as decutils
from tweakwcs.matchutils import XYXYMatch
from tweakwcs.imalign import align_wcs
__taskname__ = 'align_utils'
# Default background determination parameter values
BKG_BOX_SIZE = 27
BKG_FILTER_SIZE = 3
CATALOG_TYPES = ['point', 'segment']
MIN_CATALOG_THRESHOLD = 3
RMS_RA_COMMENT = "RMS in RA of WCS fit(mas)"
RMS_DEC_COMMENT = "RMS in Dec of WCS fit(mas)"
MSG_DATEFMT = '%Y%j%H%M%S'
SPLUNK_MSG_FORMAT = '%(asctime)s %(levelname)s src=%(name)s- %(message)s'
log = logutil.create_logger(__name__, level=logutil.logging.NOTSET, stream=sys.stdout,
format=SPLUNK_MSG_FORMAT, datefmt=MSG_DATEFMT)
[docs]
class AlignmentTable:
""" This class handles alignment operations for HST data.
Steps managed by this class include:
* **find_alignment_sources** : iterates over all inputs and
identifies sources suitable for alignment.
* **perform_fit** : cross-matches sources using user-specified method
and performs fit to user-specified catalog. Cross-matching options
include: ``relative``, ``2dhist`` or ``default`` as reported by the ``get_fit_methods``
method. It then populates the table with the results for this fit.
* **select_fit** : extracts the 'best' fit to be applied to the data with the
'best' fit determined externally by the user based on the set of fit results
stored in the ``fit_dict`` table.
* **apply_fit** : Updates all input image WCSs with the result of the selected 'best' fit
"""
def __init__(self, input_list, clobber=False, dqname='DQ', process_type='',
log_level=logutil.logging.NOTSET, **alignment_pars):
"""
Parameters
----------
input_list : list
List of input file names to be provided to ``analyze.analyze_data`` function.
clobber : bool, optional
Specifies whether or not to overwrite data on disk with updated versions of the data.
dqname : str, optional
Allows the user to customize the name of the extension (``extname``) containing the
data quality flags to be applied to the data during source identification.
process_type : str, optional
Specifies what type of data processing is being done on the input data.
Values include: '' (default for pipeline processing), 'SVM', 'MVM'.
log_level : int, optional
Set the logging level for this processing
alignment_pars : dict
Set of alignment parameters to be used for the input data.
``**alignment_pars`` needs to contain the following entries:
.. code-block:: python
{'fwhmpsf': 0.12, # kernel defining, source finding par
# background computing pars
'box_size': BKG_BOX_SIZE,
'win_size': BKG_FILTER_SIZE,
'bkg_estimator': SExtractorBackground,
'rms_estimator': StdBackgroundRMS,
'nsigma': 5.,
'threshold_flag': None,
# object finding pars
'source_box': 7,
'classify': True,
'centering_mode': "starfind",
'nlargest': None,
'plot': False,
'vmax': None,
'deblend': False
}
"""
log.setLevel(log_level)
# Register fit methods with the class
self.fit_methods = {'relative': match_relative_fit,
'2dhist': match_2dhist_fit,
'default': match_default_fit}
# Also, register fit method default parameters with the class
self.fit_pars = {'relative': alignment_pars['match_relative_fit'],
'2dhist': alignment_pars['match_2dhist_fit'],
'default': alignment_pars['match_default_fit']}
# merge remaining parameters for individual alignment steps into single set
self.alignment_pars = alignment_pars['run_align'].copy()
self.alignment_pars.update(alignment_pars['general'])
self.alignment_pars.update(alignment_pars['generate_source_catalogs'])
self.alignment_pars.update(alignment_pars['determine_fit_quality'])
# Make sure to use the values from "determine_fit_quality" instead of "general"
for key in self.fit_pars:
self.fit_pars[key]['pars'] = alignment_pars['determine_fit_quality']
self.fit_pars[key]['pars']['minobj'] = self.alignment_pars['mosaic_fitgeom_list']
self.dqname = dqname
self.haplist = []
self.process_list = None
self.zero_dt = starting_dt = datetime.datetime.now()
log.info(str(starting_dt))
# Apply filter to input observations to insure that they meet minimum criteria for being able to be aligned
log.info(
"{} AlignmentTable: Filter STEP {}".format("-" * 20, "-" * 63))
self.filtered_table, _ = analyze.analyze_data(input_list, type=process_type)
log.debug("Input sorted as: \n{}".format(self.filtered_table))
if self.filtered_table['doProcess'].sum() == 0:
log.warning("No viable images in filtered table - no processing done.\n")
current_dt = datetime.datetime.now()
delta_dt = (current_dt - starting_dt).total_seconds()
log.info('Processing time of AlignmentTable Filter STEP: {} sec'.format(delta_dt))
return
# Get the list of all "good" files to use for the alignment
process_list = self.filtered_table['imageName'][np.where(self.filtered_table['doProcess'])]
self.process_list = list(process_list) # Convert process_list from numpy list to regular python list
log.info("SUCCESS")
fwhmpsf = self.alignment_pars.get('fwhmpsf')
default_fwhm_set = False
try:
for img in self.process_list:
log.info("Adding {} to HAPImage list".format(img))
hdr0 = fits.getheader(img)
instrume = hdr0.get('instrume')
if instrume.lower() == 'wfpc2' and 'detector' not in hdr0:
detector = 'PC'
else:
detector = hdr0.get('detector')
if detector == 'SBC':
catimg = SBCHAPImage(img)
else:
catimg = HAPImage(img)
# Build image properties needed for alignment
catimg.compute_background(box_size=self.alignment_pars['box_size'],
win_size=self.alignment_pars['win_size'],
nsigma=self.alignment_pars['nsigma'],
bkg_estimator=self.alignment_pars['bkg_estimator'],
rms_estimator=self.alignment_pars['rms_estimator'],
threshold_flag=self.alignment_pars['threshold'])
catimg.build_kernel(fwhmpsf)
catimg.crclean = self.alignment_pars['classify']
log.info("CATIMG.CRCLEAN: {}".format(catimg.crclean))
if catimg.crclean:
log.info("Recomputing BACKGROUND after applying single-image CR clean")
for chip in range(1, catimg.num_sci + 1):
sciarr = amutils.crclean_image(catimg.imghdu[("SCI", chip)].data, catimg.threshold[chip],
catimg.kernel, catimg.kernel_fwhm, background=catimg.bkg[chip])
catimg.imghdu[("SCI", chip)].data = sciarr
# recompute now that the CRs have been removed (set to 0) from the science arrays
catimg.compute_background(box_size=self.alignment_pars['box_size'],
win_size=self.alignment_pars['win_size'],
nsigma=self.alignment_pars['nsigma'],
bkg_estimator=self.alignment_pars['bkg_estimator'],
rms_estimator=self.alignment_pars['rms_estimator'],
threshold_flag=self.alignment_pars['threshold'])
log.info("Finished computing revised BACKROUND")
catimg.build_kernel(fwhmpsf)
log.info("Finished determining revised kernel")
# Use FWHM from first good exposure as default for remainder of exposures
if not default_fwhm_set and catimg.kernel is not None:
fwhmpsf = catimg.fwhmpsf
default_fwhm_set = True
self.haplist.append(catimg)
except Exception:
log.error("Problem encountered in setting up alignment to a catalog")
traceback.print_exc()
self.close()
# Initialize computed attributes
self.imglist = [] # list of FITSWCSCorrector objects for tweakwcs
self.reference_catalogs = {}
self.group_id_dict = {}
self.fit_dict = {} # store results of all fitting in this dict for evaluation
self.selected_fit = None # identify fit selected by user (as 'best'?) to be applied to images WCSs
def close(self):
for img in self.haplist:
img.close()
def find_alignment_sources(self, output=True, crclean=None):
"""Find observable sources in each input exposure."""
if crclean is None:
crclean = [False] * len(self.haplist)
self.extracted_sources = {}
for img, clean in zip(self.haplist, crclean):
self.extracted_sources[img.imgname] = {}
if img.imghdu is not None:
img.find_alignment_sources(output=output, dqname=self.dqname,
crclean=clean,
**self.alignment_pars)
self.extracted_sources[img.imgname] = img.catalog_table
# Allow user to decide when and how to write out catalogs to files
if output:
# write out coord lists to files for diagnostic purposes.
# Protip: To display the sources in these files in DS9,
# set the "Coordinate System" option to "Physical"
# when loading the region file.
imgroot = "_".join(os.path.basename(img.imgname).split('_')[:-1])
for chip in range(1, img.num_sci + 1):
chip_cat = img.catalog_table[chip]
if chip_cat and len(chip_cat) > 0:
regfilename = "{}_sci{}_src.reg".format(imgroot, chip)
out_table = Table(chip_cat)
# To align with positions of sources in DS9/IRAF
out_table['xcentroid'] += 1
out_table['ycentroid'] += 1
out_table.write(regfilename,
include_names=["xcentroid", "ycentroid", "mag"],
format="ascii.fast_commented_header")
log.info("Wrote region file {}\n".format(regfilename))
self.close()
def reset_group_id(self, num_ref):
for image in self.imglist:
image.meta["group_id"] = self.group_id_dict["{}_{}".format(image.meta["rootname"], image.meta["chip"])]
image.meta['num_ref_catalog'] = num_ref
def configure_fit(self):
# Convert input images to tweakwcs-compatible FITSWCSCorrector objects and
# attach source catalogs to them.
self.imglist = []
for group_id, image in enumerate(self.process_list):
if image in self.extracted_sources:
img = amutils.build_wcscat(image, group_id,
self.extracted_sources[image])
# add the name of the image to the imglist object
for im in img:
# im.meta['name'] = image
log.debug('im.meta[name] = {}'.format(im.meta['name']))
self.imglist.extend(img)
self.group_id_dict = {}
for image in self.imglist:
self.group_id_dict["{}_{}".format(image.meta["rootname"], image.meta["chip"])] = image.meta["group_id"]
def get_fit_methods(self):
"""Return the list of method names for all registered functions
available for performing alignment.
"""
return list(self.fit_methods.keys())
def perform_fit(self, method_name, catalog_name, reference_catalog,
fitgeom='rscale'):
"""Perform fit using specified method, then determine fit quality
This method populates the ``fit_dict`` table with the results of the
specified fit keyed by (<method_name>, <catalog_name>).
Parameters
-----------
method_name : str
Name of cross-matching/fitting to use for this fit. Options
are reported by the ``get_fit_methods`` method.
catalog_name : str
Name of reference catalog to use for the fit. These are defined
by the user. Examples include: 'GAIADR1', 'GAIADR2', and 'GAIAedr3'.
This acts as the label for indexing this fit in the ``fit_dict`` table.
reference_catalog : ``astropy.table.Table``
Table containing the reference sources to be used for the fit.
fitgeom : str, optional
Type of polynomial fit to perform to determine the correction
to the WCS. Options include (from more complex to simplest):
``general``, ``rscale``, ``rshift``, ``shift``.
"""
# Updated fits_pars with value for fitgeom
self.fit_pars[method_name]['fitgeom'] = fitgeom
log.info("Setting 'fitgeom' parameter to {} for {} fit".format(fitgeom, method_name))
imglist = self.fit_methods[method_name](self.imglist, reference_catalog,
**self.fit_pars[method_name])
# save fit algorithm name to dictionary key "fit method" in imglist.
for imglist_ctr in range(0, len(imglist)):
imglist[imglist_ctr].meta['fit method'] = method_name
# store results for evaluation
self.fit_dict[(catalog_name, method_name)] = copy.deepcopy(imglist)
self.reference_catalogs[catalog_name] = reference_catalog
return imglist
def select_fit(self, catalog_name, method_name):
"""Select the fit that has been identified as 'best'
Populates the ``filtered_table`` with a row for each input exposure
that contains the results of the fit selected from the ``fit_dict`` table
populated by ``perform_dict()``.
Parameters
----------
catalog_name : str
Name of reference catalog used for fit to be selected
method_name : str
Name of cross-matching used for the selected fit
"""
if catalog_name is None:
self.selected_fit = None
return
imglist = self.selected_fit = self.fit_dict[(catalog_name, method_name)]
if imglist[0].meta['fit_info']['status'].startswith("FAILED"):
self.selected_fit = None
return
# Protect the writing of the table within the best_fit_rms
info_keys = OrderedDict(imglist[0].meta['fit_info']).keys()
# Update filtered table with number of matched sources and other information
for item in imglist:
imgname = item.meta['name']
index = np.where(self.filtered_table['imageName'] == imgname)[0][0]
status = item.meta['fit_info']['status']
if not status.startswith("FAILED"):
self.filtered_table[index]['catalog'] = item.meta['fit_info']['catalog']
self.filtered_table[index]['fit_rms'] = item.meta['fit_info']['FIT_RMS']
self.filtered_table[index]['total_rms'] = item.meta['fit_info']['TOTAL_RMS']
if status.startswith('REF'):
continue
for tweakwcs_info_key in info_keys:
if not tweakwcs_info_key.startswith("matched"):
if tweakwcs_info_key.lower() == 'rms':
self.filtered_table[index]['rms_x'] = item.meta['fit_info'][tweakwcs_info_key][0]
self.filtered_table[index]['rms_y'] = item.meta['fit_info'][tweakwcs_info_key][1]
self.filtered_table[index]['fit_method'] = item.meta['fit method']
self.filtered_table[index]['catalogSources'] = len(self.reference_catalogs[catalog_name])
self.filtered_table[index]['matchSources'] = item.meta['fit_info']['nmatches']
self.filtered_table[index]['rms_ra'] = item.meta['fit_info']['RMS_RA'].value
self.filtered_table[index]['rms_dec'] = item.meta['fit_info']['RMS_DEC'].value
self.filtered_table[index]['offset_x'], self.filtered_table[index]['offset_y'] = item.meta['fit_info']['shift']
self.filtered_table[index]['scale'] = item.meta['fit_info']['<scale>']
self.filtered_table[index]['rotation'] = item.meta['fit_info']['rot'][1]
else:
self.filtered_table = None
def apply_fit(self, headerlet_filenames=None, fit_label=None):
"""Apply solution from identified fit to image WCS's
Parameters
----------
headerlet_filenames : dict, optional
Dictionary relating exposure filenames to headerlet filenames. If None,
will generate headerlet filenames where _flt or _flc is replaced by
_flt_hlet or _flc_hlet, respectively.
fit_label : str
Name of fit to apply to indicate how the fit was performed in
the WCSNAME keyword. Common options: IMG, REL, SVM, and MVM.
"""
if not self.selected_fit:
log.error("No FIT selected for application. Please run 'select_fit()' method.")
raise ValueError
# Call update_hdr_wcs()
headerlet_dict = update_image_wcs_info(self.selected_fit,
headerlet_filenames=headerlet_filenames,
fit_label=fit_label)
for table_index in range(0, len(self.filtered_table)):
fname = self.filtered_table[table_index]['imageName']
row = self.filtered_table[table_index]
row['headerletFile'] = headerlet_dict[fname] if fname in headerlet_dict else "None"
[docs]
class HAPImage:
"""Core class defining interface for each input exposure/product
.. note:: This class is compatible with the CatalogImage class, while including
additional functionality and attributes required for processing beyond
catalog generation.
"""
def __init__(self, filename):
if isinstance(filename, str):
self.imghdu = fits.open(filename)
self.imgname = filename
else:
self.imghdu = filename
self.imgname = filename.filename()
if 'rootname' in self.imghdu[0].header:
self.rootname = self.imghdu[0].header['rootname']
else:
self.rootname = self.imgname.rstrip('.fits')
# Fits file read
self.num_sci = amutils.countExtn(self.imghdu)
self.num_wht = amutils.countExtn(self.imghdu, extname='WHT')
self.data = np.concatenate([self.imghdu[('SCI', i + 1)].data for i in range(self.num_sci)])
if not self.num_wht:
self.dqmask = self.build_dqmask()
else:
self.dqmask = None
self.wht_image = self.build_wht_image()
# Get the HSTWCS object from the first extension
self.imgwcs = HSTWCS(self.imghdu, 1)
self.pscale = self.imgwcs.pscale
self._wht_image = None
self.bkg = {}
self.bkg_dao_rms = {}
self.bkg_rms_mean = {}
self.threshold = {}
self.kernel = None
self.kernel_fwhm = None
self.kernel_psf = False
self.fwhmpsf = None
self.catalog_table = {}
# Switch to turn on/off use of single-image CR detection/removal
self.crclean = False
def build_wht_image(self):
if not self.num_wht:
# Working with a calibrated exposure, no WHT extension
# create a substitute WHT array from ERR and DQ
# Build pseudo-wht array for detection purposes
errarr = np.concatenate([self.imghdu[('ERR', i + 1)].data for i in range(self.num_sci)])
wht_image = errarr.max() / errarr
wht_image = np.nan_to_num(wht_image, nan=0.0, posinf=0.0, neginf=0.0)
if self.dqmask is not None:
wht_image[self.dqmask] = 0
else:
wht_image = self.imghdu['WHT'].data
return wht_image
def close(self):
if self.imghdu is not None:
self.imghdu.close()
self.imghdu = None
self.dqmask = None
self.wht_image = None
self._wht_image = None
self.bkg_rms_mean = {}
self.bkg = {}
self.bkg_dao_rms = {}
def build_kernel(self, fwhmpsf):
"""
Parameters
-----------
fwhmpsf : float
Default FWHM of PSF in units of arcseconds.
"""
if self.bkg is None or self.bkg == {}:
self.compute_background()
threshold_rms = np.concatenate([rms for rms in self.threshold.values()])
bkg = np.concatenate([background for background in self.bkg.values()])
log.info("Looking for sample PSF in {}".format(self.rootname))
log.debug(" based on RMS of {:9.4f}".format(threshold_rms.mean()))
fwhm = fwhmpsf / self.pscale
k, self.kernel_fwhm = amutils.build_auto_kernel(self.data - bkg,
self.wht_image,
threshold=threshold_rms,
fwhm=fwhm)
if not k[1]:
# Try one more time with slightly different background...
self.compute_background(box_size=BKG_BOX_SIZE // 2)
threshold_rms = np.concatenate([rms for rms in self.threshold.values()])
bkg = np.concatenate([background for background in self.bkg.values()])
log.info("Looking for sample PSF in {}".format(self.rootname))
log.debug(" based on RMS of {:9.4f}".format(threshold_rms.mean()))
fwhm = fwhmpsf / self.pscale
k, self.kernel_fwhm = amutils.build_auto_kernel(self.data - bkg,
self.wht_image,
threshold=threshold_rms,
fwhm=fwhm)
self.kernel, self.kernel_psf = k
log.info(" Found PSF with FWHM = {:9.4f}".format(self.kernel_fwhm))
self.fwhmpsf = self.kernel_fwhm * self.pscale
def compute_background(self, box_size=BKG_BOX_SIZE, win_size=BKG_FILTER_SIZE,
bkg_estimator="SExtractorBackground", rms_estimator="StdBackgroundRMS",
nsigma=5., threshold_flag=None):
"""Use Background2D to determine the background of the input image.
Parameters
----------
image : ndarray
Numpy array of the science extension from the observations FITS file.
box_size : int
Size of box along each axis
win_size : int
Size of 2D filter to apply to the background image
bkg_estimator : subroutine
background estimation algorithm
rms_estimator : subroutine
RMS estimation algorithm
nsigma : float
Number of sigma above background
threshold_flag : float or None
Value from the image which serves as the limit for determining sources.
If None, compute a default value of (background+5*rms(background)).
If threshold < 0.0, use absolute value as scaling factor for default value.
Returns
-------
bkg : 2D ndarray
Background image
bkg_dao_rms : ndarry
Background RMS image
threshold : ndarray
Numpy array representing the background plus RMS
"""
# Interpret estimator labels as photutils functions
bkg_estimator = register_photutils_function(bkg_estimator)
rms_estimator = register_photutils_function(rms_estimator)
# Report configuration values to log
log.debug("")
log.debug("Computation of {} background - Input Parameters".format(self.rootname))
log.debug("Box size: {}".format(box_size))
log.debug("Window size: {}".format(win_size))
log.debug("NSigma: {:9.4f}".format(nsigma))
log.debug("BKG Estimator: {}".format(bkg_estimator.__name__))
# SExtractorBackground ans StdBackgroundRMS are the defaults
exclude_percentiles = [10, 25, 50, 75]
for chip in range(self.num_sci):
chip += 1
bkg = None
bkg_dao_rms = None
scidata = self.imghdu[('sci', chip)].data
for percentile in exclude_percentiles:
try:
bkg = Background2D(scidata, box_size, filter_size=win_size,
bkg_estimator=bkg_estimator(),
bkgrms_estimator=rms_estimator(),
exclude_percentile=percentile, edge_method="pad")
except Exception:
bkg = None
continue
if bkg is not None:
bkg_dao_rms = bkg.background_rms
# Set the bkg_rms at "nsigma" sigma above background
default_threshold = bkg.background + nsigma * bkg.background_rms
bkg_rms_mean = bkg.background_rms_median if bkg.background_rms_median > 0 else 0.
if threshold_flag is None:
threshold = default_threshold
elif threshold_flag < 0:
threshold = -1 * threshold_flag * default_threshold
bkg_rms_mean = -1 * threshold_flag * bkg_rms_mean
else:
bkg_rms_mean = 3. * threshold_flag
threshold = default_threshold
break
# If Background2D does not work at all, define default scalar values for
# the background to be used in source identification
if bkg is None:
bkg_rms_mean = max(0.01, self.data.min())
bkg_dao_rms = bkg_rms_mean
threshold = (nsigma + 1) * bkg_rms_mean
# *** FIX: Need to do something for bkg if bkg is None ***
# Report other useful quantities
log.debug("{} CHIP: {}".format(self.rootname, chip))
log.debug("Mean background: {:9.4g}".format(bkg_rms_mean))
log.debug("Mean threshold: {:9.4g}".format(np.mean(threshold)))
log.debug("Mean RMS : {:9.4g}".format(bkg_rms_mean))
log.debug("")
log.debug("{}".format("=" * 60))
# Make copies using deepcopy so it will work with ndarray backgrounds
# or scalar backgrounds depending on the situation.
self.bkg[chip] = copy.deepcopy(bkg.background)
self.bkg_dao_rms[chip] = copy.deepcopy(bkg_dao_rms)
self.bkg_rms_mean[chip] = copy.deepcopy(bkg_rms_mean)
self.threshold[chip] = copy.deepcopy(threshold)
del bkg, default_threshold, threshold, bkg_dao_rms, bkg_rms_mean
def build_dqmask(self, chip=None):
# apply any DQ array, if available
dqmask = None
if chip:
dqarr = self.imghdu[('DQ', chip)].data.astype(np.int16)
else:
dqarr = np.concatenate([self.imghdu[('DQ', i + 1)].data.astype(np.int16) for i in range(self.num_sci)])
# "grow out" regions in DQ mask flagged as saturated by several
# pixels in every direction to prevent the
# source match algorithm from trying to match multiple sources
# from one image to a single source in the
# other or vice-versa.
# Create temp DQ mask containing all pixels flagged with any value EXCEPT 256
non_sat_mask = bitfield_to_boolean_mask(dqarr, ignore_flags=256 + 2048)
# Create temp DQ mask containing saturated pixels ONLY
sat_mask = bitfield_to_boolean_mask(dqarr, ignore_flags=~(256 + 2048))
# Ignore sources where only a couple of pixels are flagged as saturated
sat_mask = ndimage.binary_erosion(sat_mask, iterations=1)
# Grow out saturated pixels by a few pixels in every direction
grown_sat_mask = ndimage.binary_dilation(sat_mask, iterations=5)
# combine the two temporary DQ masks into a single composite DQ mask.
dqmask = np.bitwise_and(non_sat_mask, grown_sat_mask)
# astropy's code returned the opposite bitmask from what was originally
# defined by stsci.tools own bitmask code.
if Version(stsci.tools.__version__) >= Version('4.0.0'):
dqmask = ~dqmask
return dqmask
def find_alignment_sources(self, output=True, dqname='DQ', crclean=False,
**alignment_pars):
"""Find sources in all chips for this exposure."""
if crclean:
self.imghdu = fits.open(self.imgname, mode='update')
for chip in range(self.num_sci):
chip += 1
# find sources in image
if output:
outroot = '{}_sci{}_src'.format(self.rootname, chip)
else:
outroot = None
dqmask = self.build_dqmask(chip=chip)
sciarr = self.imghdu[("SCI", chip)].data.copy()
# Turning off 'classify' since same CRs are being removed before segmentation now
extract_pars = {'classify': False, # alignment_pars['classify'],
'centering_mode': alignment_pars['centering_mode'],
'nlargest': alignment_pars['MAX_SOURCES_PER_CHIP'],
'deblend': alignment_pars['deblend']}
with warnings.catch_warnings():
warnings.simplefilter('ignore', NoDetectionsWarning)
seg_tab, segmap, crmap = amutils.extract_sources(sciarr, dqmask=dqmask,
outroot=outroot,
kernel=self.kernel,
segment_threshold=self.threshold[chip],
dao_threshold=self.bkg_rms_mean[chip],
fwhm=self.kernel_fwhm,
**extract_pars)
if crclean and crmap is not None:
i = self.imgname.replace('.fits', '')
if log.level < logutil.logging.INFO:
# apply crmap to input image
crfile = "{}_crmap_dq{}.fits".format(i, chip)
fits.PrimaryHDU(data=crmap).writeto(crfile, overwrite=True)
log.debug("Updating DQ array for {} using single-image CR identification algorithm".format(i))
self.imghdu[(dqname, chip)].data = np.bitwise_or(self.imghdu[(dqname, chip)].data, crmap)
del crmap
self.catalog_table[chip] = seg_tab
if crclean and log.level < logutil.logging.INFO:
self.imghdu.writeto(self.imgname.replace('.fits', '_crmap.fits'), overwrite=True)
if self.imghdu is not None:
self.imghdu.close()
self.imghdu = None
class SBCHAPImage(HAPImage):
def __init__(self, filename):
super().__init__(filename)
self.exptime = max(self.imghdu[0].header['exptime'], 1.0)
def find_alignment_sources(self, output=True, dqname='DQ', crclean=False,
**alignment_pars):
"""Find sources in all chips for this exposure."""
# Only 1 chip, no need to loop
chip = 1
# find sources in image
if output:
outroot = '{}_sci{}_src'.format(self.rootname, chip)
else:
outroot = None
sciarr = self.imghdu[("SCI", chip)].data.copy()
# Remove all background noise
# This background noise is effectively integerized by the SBC detector
sci_gauss = ndimage.gaussian_filter(sciarr, sigma=3.0)
_, bkg_noise_median, bkg_noise_stddev = astropy.stats.sigma_clipped_stats(sci_gauss, maxiters=1)
# bkg_noise = astropy.stats.sigma_clipped_stats(sciarr[sciarr > 0], maxiters=1)
bkg_limit = bkg_noise_median + 3 * bkg_noise_stddev # Set limit as n-sigma above mean
log.debug(f"BKG_LIMIT for source identification: {bkg_limit} based on {bkg_noise_median},{bkg_noise_stddev}")
# create mask of all background pixels and zero out all those pixels
bkg_mask = sci_gauss <= bkg_limit
sci_bkgsub = sciarr.copy()
sci_bkgsub[bkg_mask] = 0
# A high LoG sigma value is needed to smooth over the pixelated nature of the SBC PSFs
# This avoids having PSF halo peaks being detected as 'real' sources
slabels, snum, _ = amutils.detect_point_sources(sci_bkgsub, background=0, log_sigma=5.0)
# create mask based on these sources only...
bkg_mask = ~np.clip(slabels, 0, 1).astype(bool)
# Measure sub-pixel positions of each identified source now.
daofind = DAOStarFinder(fwhm=self.kernel_fwhm, threshold=0)
log.debug("Determining source properties as src_table...")
src_table = daofind(sciarr, mask=bkg_mask)
del sci_bkgsub, bkg_mask, sci_gauss # explicitly clean up memory
if src_table is not None:
log.info("Total Number of detected sources: {}".format(len(src_table)))
else:
log.info("No detected sources!")
self.catalog_table[chip] = None
if self.imghdu is not None:
self.imghdu.close()
self.imghdu = None
return
photvals = {}
photvals['photmode'] = self.imghdu[('sci', 1)].header['PHOTMODE']
photvals['photflam'] = self.imghdu[('sci', 1)].header['PHOTFLAM']
photvals['photplam'] = self.imghdu[('sci', 1)].header['PHOTPLAM']
# Include magnitudes for each source for use in verification of alignment through
# comparison with GAIA magnitudes
tbl = amutils.compute_photometry(src_table, photvals)
# Insure all IDs are sequential and unique (at least in this catalog)
tbl['cat_id'] = np.arange(1, len(tbl) + 1)
if outroot:
tbl['xcentroid'].info.format = '.10f' # optional format
tbl['ycentroid'].info.format = '.10f'
tbl['flux'].info.format = '.10f'
if not outroot.endswith('.cat'):
outroot += '.cat'
tbl.write(outroot, format='ascii.commented_header', overwrite=True)
log.info("Wrote source catalog: {}".format(outroot))
self.catalog_table[chip] = tbl
if self.imghdu is not None:
self.imghdu.close()
self.imghdu = None
# ----------------------------------------------------------------------------------------------------------------------
[docs]
def match_relative_fit(imglist, reference_catalog, **fit_pars):
"""Perform cross-matching and final fit using relative matching algorithm
The fitting performed by this algorithm first aligns all input images specified
in the ``imglist`` to the FIRST image in that list. When this fit is successful,
it insures that the images are aligned RELATIVE to each other. This set of co-aligned
WCSs are then fit to the specified ``reference_catalog`` to improve the absolute
astrometry for all input images (when successful).
Parameters
----------
imglist : list
List of input image
`tweakwcs.correctors.FITSWCSCorrector
<https://tweakwcs.readthedocs.io/en/latest/source/correctors.html#tweakwcs.correctors.FITSWCSCorrector>`_
objects with metadata and source catalogs
reference_catalog : Table
Astropy Table of reference sources for this field
fit_pars : dict
Set of parameters and values to be used for the fit. This should include
``fitgeom`` as well as any `tweakwcs.XYXYMatch
<https://tweakwcs.readthedocs.io/en/latest/source/matchutils.html>`_
parameter which the user feels needs to be adjusted to work best with the input data.
Returns
--------
imglist : list
List of input image
`tweakwcs.correctors.FITSWCSCorrector
<https://tweakwcs.readthedocs.io/en/latest/source/correctors.html#tweakwcs.correctors.FITSWCSCorrector>`_
objects swith metadata and source catalogs
"""
log.info("{} (match_relative_fit) Cross matching and fitting {}".format("-" * 20, "-" * 27))
if 'fitgeom' in fit_pars:
fitgeom = fit_pars['fitgeom']
del fit_pars['fitgeom']
else:
fitgeom = 'rscale'
rel_fitgeom = 'rscale'
common_pars = fit_pars['pars']
del fit_pars['pars']
nclip = 1 if fitgeom == 'rscale' else 0 # Only perform sigma-clipping for 'rscale'
# 0: Specify matching algorithm to use
match = XYXYMatch(**fit_pars)
# match = XYXYMatch(searchrad=250, separation=0.1, tolerance=100, use2dhist=False)
# Align images and correct WCS
# NOTE: this invocation does not use an astrometric catalog. This call
# allows all the input images to be aligned in
# a relative way using the first input image as the reference.
# 1: Perform relative alignment
# Setting 'minobj' to None allows 'tweakwcs' to use its own built-in
# limits.
match_relcat = align_wcs(imglist, None, match=match,
minobj=common_pars['minobj'][rel_fitgeom],
expand_refcat=True, fitgeom=rel_fitgeom,
nclip=1)
# Implement a consistency check even before trying absolute alignment
# If relative alignment in question, no use in aligning to GAIA
if not check_consistency(imglist):
return imglist
log.info("Relative alignment found: ")
for i in imglist:
info = i.meta['fit_info']
if 'shift' not in info:
off = (0., 0.)
rot = 0.
scale = -1.
nmatches = 0
else:
off = info['shift']
rot = info['<rot>']
scale = info['<scale>']
nmatches = info['nmatches']
msg = "Image {} --".format(i.meta['name'])
msg += "\n SHIFT:({:9.4f},{:9.4f}) NMATCHES: {} ".format(off[0], off[1], nmatches)
msg += "\n ROT:{:9.4f} SCALE:{:9.4f}".format(rot, scale)
msg += "\n Using fitgeom = '{}'".format(rel_fitgeom)
log.info(msg)
# This logic enables performing only relative fitting and skipping fitting to GAIA
if reference_catalog is not None:
# Set all the group_id values to be the same so the various images/chips will be aligned to the astrometric
# reference catalog as an ensemble.
# astrometric reference catalog as an ensemble. BEWARE: If additional iterations of solutions are to be
# done, the group_id values need to be restored.
for image in imglist:
image.meta["group_id"] = 1234567
# 2: Perform absolute alignment
matched_cat = align_wcs(imglist, reference_catalog, match=match,
minobj=common_pars['minobj'][fitgeom],
fitgeom=fitgeom, nclip=nclip)
# Insure the expanded reference catalog has all the information needed
# to complete processing.
# reference_catalog = match_relcat
# reference_catalog['mag'] = np.array([-999.9] * len(reference_catalog),
# np.float32)
# reference_catalog.meta['catalog'] = 'relative'
# 3: Interpret RMS values from tweakwcs
interpret_fit_rms(imglist, reference_catalog)
del match_relcat
return imglist
# ----------------------------------------------------------------------------------------------------------
[docs]
def match_default_fit(imglist, reference_catalog, **fit_pars):
"""Perform cross-matching and final fit using default tolerance matching
This function performs the specified type of fit ('general', 'rscale', ...) directly
between all input images and the reference catalog. If successful, each input image will
have its absolute WCS aligned to the reference catalog.
Parameters
----------
imglist : list
List of input image
`tweakwcs.correctors.FITSWCSCorrector
<https://tweakwcs.readthedocs.io/en/latest/source/correctors.html#tweakwcs.correctors.FITSWCSCorrector>`_
objects with metadata and source catalogs
reference_catalog : Table
Astropy Table of reference sources for this field
fit_pars : dict
Set of parameters and values to be used for the fit. This should include
``fitgeom`` as well as any `tweakwcs.XYXYMatch
<https://tweakwcs.readthedocs.io/en/latest/source/matchutils.html>`_
parameter which the user feels needs to be adjusted to work best with the input data.
Returns
--------
imglist : list
List of input image
`tweakwcs.correctors.FITSWCSCorrector
<https://tweakwcs.readthedocs.io/en/latest/source/correctors.html#tweakwcs.correctors.FITSWCSCorrector>`_
objects with metadata and source catalogs
"""
if 'fitgeom' in fit_pars:
fitgeom = fit_pars['fitgeom']
del fit_pars['fitgeom']
else:
fitgeom = 'rscale'
common_pars = fit_pars['pars']
del fit_pars['pars']
nclip = 1 if fitgeom == 'rscale' else 0 # Only perform sigma-clipping for 'rscale'
log.info("{} (match_default_fit) Cross matching and fitting "
"{}".format("-" * 20, "-" * 27))
# Specify matching algorithm to use
match = XYXYMatch(**fit_pars)
# Align images and correct WCS
matched_cat = align_wcs(imglist, reference_catalog, match=match,
minobj=common_pars['minobj'][fitgeom],
expand_refcat=False, fitgeom=fitgeom, nclip=nclip)
# Interpret RMS values from tweakwcs
interpret_fit_rms(imglist, reference_catalog)
del matched_cat
return imglist
# ----------------------------------------------------------------------------------------------------------------------
[docs]
def match_2dhist_fit(imglist, reference_catalog, **fit_pars):
"""Perform cross-matching and final fit using 2dHistogram matching
This function performs cross-matching of each separate input image to the
sources in the reference catalog by looking for common integer offsets between all
sources in the input list and all sources in the reference catalog. This
offset is then used as the starting point for the final fit to the reference
catalog to align each input image SEPARATELY to the reference catalog.
Parameters
----------
imglist : list
List of input image
`tweakwcs.correctors.FITSWCSCorrector
<https://tweakwcs.readthedocs.io/en/latest/source/correctors.html#tweakwcs.correctors.FITSWCSCorrector>`_
objects with metadata and source catalogs
reference_catalog : Table
Astropy Table of reference sources for this field
fit_pars : dict
Set of parameters and values to be used for the fit. This should include
``fitgeom`` as well as any `tweakwcs.XYXYMatch
<https://tweakwcs.readthedocs.io/en/latest/source/matchutils.html>`_
parameter which the user feels needs to be adjusted to work best with the input data.
Returns
--------
imglist : list
List of input image
`tweakwcs.correctors.FITSWCSCorrector
<https://tweakwcs.readthedocs.io/en/latest/source/correctors.html#tweakwcs.correctors.FITSWCSCorrector>`_
objects with metadata and source catalogs
"""
if 'fitgeom' in fit_pars:
fitgeom = fit_pars['fitgeom']
del fit_pars['fitgeom']
else:
fitgeom = 'rscale'
common_pars = fit_pars['pars']
del fit_pars['pars']
nclip = 1 if fitgeom == 'rscale' else 0 # Only perform sigma-clipping for 'rscale'
log.info("{} (match_2dhist_fit) Cross matching and fitting "
"{}".format("-" * 20, "-" * 28))
# Specify matching algorithm to use
match = XYXYMatch(**fit_pars)
# Align images and correct WCS
matched_cat = align_wcs(imglist, reference_catalog, match=match,
minobj=common_pars['minobj'][fitgeom],
expand_refcat=False, fitgeom=fitgeom, nclip=nclip)
# Interpret RMS values from tweakwcs
interpret_fit_rms(imglist, reference_catalog)
del matched_cat
return imglist
# ----------------------------------------------------------------------------------------------------------
def check_consistency(imglist, rot_tolerance=0.1, shift_tolerance=1.0):
"""Check to see whether relative alignment solutions are realistically consistent
Should any image end up with a relative alignment fit where rot is more than
0.1 degree or 1 pixel off from the other images fits, this check will
mark this fit as 'FAILED' in the '.meta["fit_info"]["status"]' field for
all images.
"""
is_consistent = True
# set up arrays for relative alignment rotation and shifts
rots = np.zeros(len(imglist)+1, np.float64)
nmatches = np.zeros(len(imglist), np.int16)
for i, img in enumerate(imglist):
finfo = img.meta['fit_info']
status = finfo['status']
if not status.startswith("FAILED"):
if 'rot' in finfo:
rots[i] = finfo['proper_rot']
nmatches[i] = finfo['nmatches']
else:
# 'status' == 'FAILED': Set default as negative value
nmatches[i] = -1
is_consistent = False
# We should only need to check for consistency when less than 5
# matches were used for the fit, leading to a higher potential for
# a singular solution or mis-identified cross-matches.
if nmatches.min() > 4:
return is_consistent
# compute deltas to look for outliers
delta_rots = rots[1:] - rots[:-1]
if delta_rots.max() >= rot_tolerance:
for i, img in enumerate(imglist):
msg = 'Relative consistency check failed: {}'.format(rots[i])
img.meta['fit_info']['status'] = 'FAILED'
img.meta['fit_info']['process_msg'] = msg
log.info('Relative fit solution is NOT consistent!')
fitgeom = finfo['fitgeom'] if 'fitgeom' in finfo else 'Unknown'
log.debug('DELTAS for "{}" fit:'.format(fitgeom))
log.debug(' max rot={:.4f}\n '.format(delta_rots.max()))
is_consistent = False
else:
log.info('Relative fit solution is consistent')
return is_consistent
def interpret_fit_rms(tweakwcs_output, reference_catalog):
"""Interpret the FIT information to convert RMS to physical units
Parameters
----------
tweakwcs_output : list
output of tweakwcs. Contains sourcelist tables, newly computed WCS info, etc. for every chip of every valid
input image. This list gets updated, in-place, with the new RMS values;
specifically,
* 'FIT_RMS': RMS of the separations between fitted image positions and reference positions
* 'TOTAL_RMS': mean of the FIT_RMS values for all observations
* 'NUM_FITS': number of images/group_id's with successful fits included in the TOTAL_RMS
These entries are added to the 'fit_info' dictionary.
reference_catalog : astropy.Table
Table of reference source positions used for the fit
Returns
-------
Nothing
"""
# Start by collecting information by group_id
group_ids = [info.meta['group_id'] for info in tweakwcs_output]
# Compress the list to have only unique group_id values to avoid some unnecessary iterations
group_ids = list(set(group_ids))
group_dict = {'avg_RMS': None}
obs_rms = []
for group_id in group_ids:
input_mag = None
for item in tweakwcs_output:
tinfo = item.meta['fit_info']
# When status = FAILED (fit failed) or REFERENCE (relative alignment done with first image
# as the reference), skip to the beginning of the loop as there is no 'fit_info'.
if tinfo['status'] != 'SUCCESS' or (tinfo['status'] == 'SUCCESSS' and \
'fitmask' not in tinfo):
continue
# Make sure to store data for any particular group_id only once.
if item.meta['group_id'] == group_id and \
group_id not in group_dict:
group_dict[group_id] = {'ref_idx': None, 'FIT_RMS': None,
'input_mag': None, 'ref_mag': None, 'input_idx': None}
# Perform checks to insure that fit was successful
# Namely, (rot < 0.1 degree or scale < 1%) and skew < 1%
if (abs(tinfo['<scale>'] - 1) > 0.01 or abs(tinfo['<rot>']) > 0.1) and \
tinfo['skew'] > 0.01:
# fit is bad
tinfo['status'] = 'FAILED: potential singularity in fit'
log.debug("fit_info: {}".format(item.meta['fit_info']))
ref_idx = tinfo['matched_ref_idx']
fitmask = tinfo['fitmask']
group_dict[group_id]['ref_idx'] = ref_idx
input_RA = tinfo['fit_RA']
input_DEC = tinfo['fit_DEC']
ref_RA = reference_catalog[ref_idx]['RA'][fitmask]
ref_DEC = reference_catalog[ref_idx]['DEC'][fitmask]
img_coords = SkyCoord(input_RA, input_DEC,
unit='deg', frame='icrs')
ref_coords = SkyCoord(ref_RA, ref_DEC, unit='deg', frame='icrs')
dra, ddec = img_coords.spherical_offsets_to(ref_coords)
ra_rms = np.std(dra.to(u.mas))
dec_rms = np.std(ddec.to(u.mas))
fit_rms = np.std(Angle(img_coords.separation(ref_coords), unit=u.mas)).value
# Get mag from reference catalog, or input image catalog as needed
group_dict[group_id]['ref_mag'] = reference_catalog[ref_idx]['mag'][fitmask]
group_dict[group_id]['FIT_RMS'] = fit_rms
group_dict[group_id]['RMS_RA'] = ra_rms
group_dict[group_id]['RMS_DEC'] = dec_rms
input_mag = item.meta['catalog']['mag']
group_dict[group_id]['input_mag'] = input_mag
group_dict[group_id]['input_idx'] = tinfo['matched_input_idx']
obs_rms.append(fit_rms)
else:
if input_mag is not None:
input_mag = input_mag.copy(data=np.hstack((input_mag, item.meta['catalog']['mag'])))
group_dict[group_id]['input_mag'] = input_mag
# Compute RMS for entire ASN/observation set
total_rms = np.mean(obs_rms)
# Now, append computed results to tweakwcs_output
for item in tweakwcs_output:
group_id = item.meta['group_id']
fitmask = item.meta['fit_info'].get('fitmask', None)
if group_id in group_dict:
fit_rms = group_dict[group_id]['FIT_RMS']
ra_rms = group_dict[group_id]['RMS_RA']
dec_rms = group_dict[group_id]['RMS_DEC']
input_idx = group_dict[group_id]['input_idx']
input_mag = group_dict[group_id]['input_mag'][input_idx][fitmask]
ref_mag = group_dict[group_id]['ref_mag']
else:
fit_rms = None
ra_rms = None
dec_rms = None
input_mag = None
ref_mag = None
item.meta['fit_info']['FIT_RMS'] = fit_rms
item.meta['fit_info']['TOTAL_RMS'] = total_rms
item.meta['fit_info']['NUM_FITS'] = len(group_ids)
item.meta['fit_info']['RMS_RA'] = ra_rms
item.meta['fit_info']['RMS_DEC'] = dec_rms
item.meta['fit_info']['catalog'] = reference_catalog.meta['catalog']
item.meta['fit_info']['input_mag'] = input_mag
item.meta['fit_info']['ref_mag'] = ref_mag
# ----------------------------------------------------------------------------------------------------------------------
def update_image_wcs_info(tweakwcs_output, headerlet_filenames=None, fit_label=None):
"""Write newly computed WCS information to image headers and write headerlet files
Parameters
----------
tweakwcs_output : list
output of tweakwcs. Contains sourcelist tables, newly computed WCS info, etc. for every chip of every valid
every valid input image.
headerlet_filenames : dictionary, optional
dictionary that maps the flt/flc.fits file name to the corresponding custom headerlet filename.
fit_label : string, optional
Short label to use for (part of) the name of the WCS being updated in the image header.
This label will be appended to the end of the full `WCSNAME` value generated from the
`IDCTAB` name and type of fit.
Returns
-------
out_headerlet_list : dictionary
a dictionary of the headerlet files created by this subroutine, keyed by flt/flc fits filename.
"""
out_headerlet_dict = {}
for item in tweakwcs_output:
image_name = item.meta['filename']
chipnum = item.meta['chip']
hdulist = fits.open(image_name, mode='update')
# start by insuring that the current version of STWCS and Astropy are recorded
# in the header, not the versions used to create the previous WCS
# This logic comes from STWCS in order to be compatible with STWCS in
# terms of where these keywords should be found in the PRIMARY header.
upwcsver = stwcs.__version__
pywcsver = astropy.__version__
log.info('Updating PRIMARY header with:')
log.info(' UPWCSVER = {}'.format(upwcsver))
log.info(' PYWCSVER = {}'.format(pywcsver))
if 'HISTORY' in hdulist[0].header:
after_kw = None
before_kw = 'HISTORY'
elif 'ASN_MTYP' in hdulist[0].header:
after_kw = 'ASN_MTYP'
before_kw = None
else:
after_kw = hdulist[0].header.cards[-1][0]
before_kw = None
hdulist[0].header.set('UPWCSVER', value=upwcsver,
comment="Version of STWCS used to update the WCS",
after=after_kw, before=before_kw)
hdulist[0].header.set('PYWCSVER', value=pywcsver,
comment="Version of Astropy used to update the WCS",
after='UPWCSVER')
if chipnum == 1:
chipctr = 1
num_sci_ext = amutils.countExtn(hdulist)
# generate wcs name for updated image header, headerlet
# Just in case header value 'wcs_name' is empty.
if fit_label is None:
if 'relative' in item.meta['fit method']:
fit_label = 'REL'
else:
fit_label = 'IMG'
if not hdulist['SCI', 1].header['WCSNAME'] or hdulist['SCI', 1].header['WCSNAME'] == "":
wcs_name = "FIT_{}_{}".format(fit_label, item.meta['catalog_name'])
else:
wname = hdulist['sci', 1].header['wcsname']
if "-" in wname:
wcs_name = '{}-FIT_{}_{}'.format(wname[:wname.index('-')],
fit_label,
item.meta['fit_info']['catalog'])
else:
wcs_name = '{}-FIT_{}_{}'.format(wname, fit_label, item.meta['fit_info']['catalog'])
# establish correct mapping to the science extensions
try:
sci_ext_dict = {}
for sci_ext_ctr in range(1, num_sci_ext + 1):
sci_ext_dict["{}".format(sci_ext_ctr)] = fileutil.findExtname(hdulist, 'sci', extver=sci_ext_ctr)
except Exception:
exc_type, exc_value, exc_tb = sys.exc_info()
traceback.print_exception(exc_type, exc_value, exc_tb, file=sys.stdout)
logging.exception("message")
# update header with new WCS info
sci_extn = sci_ext_dict["{}".format(item.meta['chip'])]
hdr_name = "{}_{}-hlet.fits".format(image_name.rstrip("fits")[:-1], wcs_name)
updatehdr.update_wcs(hdulist, sci_extn, item.wcs, wcsname=wcs_name, reusename=True)
info = item.meta['fit_info']
if info['catalog'] and info['catalog'] != '':
# Explicitly report the RMS values in units of mas.
rms_ra_val = info['RMS_RA'].mas if info['RMS_RA'] is not None else -1.0
rms_dec_val = info['RMS_DEC'].mas if info['RMS_DEC'] is not None else -1.0
hdulist[sci_extn].header['RMS_RA'] = (rms_ra_val, RMS_RA_COMMENT)
hdulist[sci_extn].header['RMS_DEC'] = (rms_dec_val, RMS_DEC_COMMENT)
# convert RMS values from units of mas to deg in order to be consistent
# with CUNIT keyword value, as per FITS Paper I standard.
# https://www.aanda.org/articles/aa/full/2002/45/aah3859/aah3859.right.html Section 2.6
cr1_comment = RMS_RA_COMMENT.replace('mas', 'deg')
cr2_comment = RMS_DEC_COMMENT.replace('mas', 'deg')
hdulist[sci_extn].header['CRDER1'] = (info['RMS_RA'].deg, cr1_comment) if info['RMS_RA'] is not None else -1.0
hdulist[sci_extn].header['CRDER2'] = (info['RMS_DEC'].deg, cr2_comment) if info['RMS_DEC'] is not None else -1.0
hdulist[sci_extn].header['NMATCHES'] = len(info['ref_mag']) if info['ref_mag'] is not None else 0
hdulist[sci_extn].header['FITGEOM'] = info['fitgeom'] if info['fitgeom'] is not None else 'N/A'
else:
hdulist[sci_extn].header['RMS_RA'] = (-1.0, RMS_RA_COMMENT)
hdulist[sci_extn].header['RMS_DEC'] = (-1.0, RMS_DEC_COMMENT)
hdulist[sci_extn].header['CRDER1'] = -1.0
hdulist[sci_extn].header['CRDER2'] = -1.0
hdulist[sci_extn].header['NMATCHES'] = 0
hdulist[sci_extn].header['FITGEOM'] = "N/A"
# Update value of 'nmatches' in fit_info so that this value will get
# used in writing out the headerlet as a file.
info['nmatches'] = hdulist[sci_extn].header['NMATCHES']
if 'HDRNAME' in hdulist[sci_extn].header:
del hdulist[sci_extn].header['HDRNAME']
hdulist[sci_extn].header['HDRNAME'] = hdr_name
hdulist.flush()
hdulist.close()
# Create headerlet
out_headerlet = headerlet.create_headerlet(image_name, hdrname=hdr_name, wcsname=wcs_name,
logging=False)
# Update headerlet
update_headerlet_phdu(item, out_headerlet)
# Write headerlet
if headerlet_filenames:
headerlet_filename = headerlet_filenames[image_name] # Use HAP-compatible filename defined in runhlaprocessing.py
else:
if image_name.endswith("flc.fits"):
headerlet_filename = image_name.replace("flc", "flt_hlet")
if image_name.endswith("flt.fits"):
headerlet_filename = image_name.replace("flt", "flt_hlet")
out_headerlet.writeto(headerlet_filename, overwrite=True)
log.info("Wrote headerlet file {}.\n\n".format(headerlet_filename))
out_headerlet_dict[image_name] = headerlet_filename
# Attach headerlet as HDRLET extension
if headerlet.verify_hdrname_is_unique(hdulist, hdr_name):
headerlet.attach_headerlet(image_name, headerlet_filename, logging=False)
chipctr += 1
return (out_headerlet_dict)
# --------------------------------------------------------------------------------------------------------------
def update_headerlet_phdu(tweakwcs_item, headerlet):
"""Update the primary header data unit keywords of a headerlet object in-place
Parameters
==========
tweakwcs_item :
Basically the output from tweakwcs which contains the cross match and fit information for every chip
of every valid input image.
headerlet : headerlet object
object containing WCS information
"""
# Get the data to be used as values for FITS keywords
info = tweakwcs_item.meta['fit_info']
if 'nmatches' in info:
rms_ra = info['RMS_RA'].value if info['RMS_RA'] is not None else -1.0
rms_dec = info['RMS_DEC'].value if info['RMS_RA'] is not None else -1.0
fit_rms = info['FIT_RMS']
nmatch = info['nmatches']
catalog = info['catalog']
fit_method = tweakwcs_item.meta['fit method']
x_shift = (info['shift'])[0]
y_shift = (info['shift'])[1]
rot = info['rot'][1] # report rotation of Y axis only
scale = info['<scale>']
skew = info['skew']
# Update the existing FITS keywords
primary_header = headerlet[0].header
primary_header['RMS_RA'] = (rms_ra, RMS_RA_COMMENT)
primary_header['RMS_DEC'] = (rms_dec, RMS_DEC_COMMENT)
primary_header['NMATCH'] = nmatch
primary_header['CATALOG'] = catalog
primary_header['FITMETH'] = fit_method
# Create a new FITS keyword
primary_header['FIT_RMS'] = (fit_rms, 'RMS (mas) of the 2D fit of the headerlet solution')
# Create the set of HISTORY keywords
primary_header['HISTORY'] = '~~~~~ FIT PARAMETERS ~~~~~'
primary_header['HISTORY'] = '{:>15} : {:9.4f} "/pixels'.format('platescale', tweakwcs_item.wcs.pscale)
primary_header['HISTORY'] = '{:>15} : {:9.4f} pixels'.format('x_shift', x_shift)
primary_header['HISTORY'] = '{:>15} : {:9.4f} pixels'.format('y_shift', y_shift)
primary_header['HISTORY'] = '{:>15} : {:9.4f} degrees'.format('rotation', rot)
primary_header['HISTORY'] = '{:>15} : {:9.4f}'.format('scale', scale)
primary_header['HISTORY'] = '{:>15} : {:9.4f}'.format('skew', skew)
# --------------------------------------------------------------------------------------------------------------
def register_photutils_function(name):
"""Convert photutils name as a string into a pointer to the actual photutils function"""
func = getattr(background, name) # raises AttributeError if not found
return func
# --------------------------------------------------------------------------------------------------------------
########################################################################################
# Author: Ujash Joshi, University of Toronto, 2017 #
# Based on Octave implementation by: Benjamin Eltzner, 2014 <b.eltzner@gmx.de> #
# Octave/Matlab normxcorr2 implementation in python 3.5 #
# Details: #
# Normalized cross-correlation. Similiar results upto 3 significant digits. #
# https://github.com/Sabrewarrior/normxcorr2-python/master/norxcorr2.py #
# http://lordsabre.blogspot.ca/2017/09/matlab-normxcorr2-implemented-in-python.html #
########################################################################################
def normxcorr2(template, image, mode="full"):
"""
Input arrays should be floating point numbers.
:param template: N-D array, of template or filter you are using for cross-correlation.
Must be less or equal dimensions to image.
Length of each dimension must be less than length of image.
:param image: N-D array
:param mode: Options, "full", "valid", "same"
full (Default): The output of fftconvolve is the full discrete linear convolution of the inputs.
Output size will be image size + 1/2 template size in each dimension.
valid: The output consists only of those elements that do not rely on the zero-padding.
same: The output is the same size as image, centered with respect to the ‘full’ output.
:return: N-D array of same dimensions as image. Size depends on mode parameter.
"""
# If this happens, it is probably a mistake
if np.ndim(template) > np.ndim(image) or \
len([i for i in range(np.ndim(template)) if template.shape[i] > image.shape[i]]) > 0:
log.warning("normxcorr2: TEMPLATE larger than IMG. Arguments may be swapped.")
template = template - np.mean(template)
image = image - np.mean(image)
a1 = np.ones(template.shape)
# Faster to flip up down and left right then use fftconvolve instead of scipy's correlate
ar = np.flipud(np.fliplr(template))
out = fftconvolve(image, ar.conj(), mode=mode)
image = fftconvolve(np.square(image), a1, mode=mode) - \
np.square(fftconvolve(image, a1, mode=mode)) / (np.prod(template.shape))
# Remove small machine precision errors after subtraction
image[np.where(image < 0)] = 0
template = np.sum(np.square(template))
out = out / np.sqrt(image * template)
# Remove any divisions by 0 or very close to 0
out[np.where(np.logical_not(np.isfinite(out)))] = 0
return out
def compute_xcorr_offset(image, refimage, window=32):
"""Use normxcorr2 to determine sub-pixel offset between two images.
Usage
-----
flcs = sorted(glob.glob('*fl?.fits'))
imglist = [fits.getdata(f, ext=0) for f in flcs]
# determine position of central source to be aligned
xcen = 512
ycen = 563
# create slice tuple for subarray around that position to be used for alignment
imgslice = (slice(ycen-256, ycen+256), slice(xcen-256, xcen+256))
# perform alignment relative to first image in list
offset = compute_xcorr_offset(imglist[1][imgslice], imglist[0][imgslice])
Parameters
----------
image : ndarray
Numpy array of image to be aligned
refimage : ndarray
Numpy array of image which input image should be aligned to
Returns
-------
offset : tuple
X,Y tuple of the sub-pixel offset between the two images.
"""
xcorr_img = normxcorr2(image, refimage)
# determine initial guess (to a pixel) of the peak
xy_peaks = np.where(xcorr_img == xcorr_img.max())
peak_y = int(xy_peaks[0][0])
peak_x = int(xy_peaks[1][0])
# only fit the region extending out to the size of the window//2
# from the peak position
xcorr_slice = (slice(peak_y-window//2, peak_y+window//2), slice(peak_x-window//2, peak_x+window//2))
peak_img = xcorr_img[xcorr_slice]
y, x = np.mgrid[:window, :window]
# Now, fit a Gaussian to the peak to look for sub-pixel offset
p_model = models.Gaussian2D(x_mean=window//2, y_mean=window//2)
fit_p = fitting.LevMarLSQFitter()
p = fit_p(p_model, x, y, peak_img)
# compute offset
offset = (window//2 - p.x_mean.value, window//2 - p.y_mean.value)
return offset