Source code for drizzlepac.align
#!/usr/bin/env python
"""This script is a modernized implementation of tweakreg.
"""
import copy
import datetime
import sys
import glob
import math
import os
import pickle
from collections import OrderedDict
import traceback
import numpy as np
from astropy.table import Table
from astropy.io import fits
from stsci.tools import logutil
from .haputils import astrometric_utils as amutils
from .haputils import astroquery_utils as aqutils
from .haputils import get_git_rev_info
from .haputils import align_utils
from .haputils import config_utils
from . import __version__
__taskname__ = "align"
MSG_DATEFMT = "%Y%j%H%M%S"
SPLUNK_MSG_FORMAT = "%(asctime)s %(levelname)s src=%(name)s- %(message)s"
def _init_logger():
log = logutil.create_logger(
__name__,
level=logutil.logging.NOTSET,
stream=sys.stdout,
format=SPLUNK_MSG_FORMAT,
datefmt=MSG_DATEFMT,
)
return log
log = _init_logger()
# Initial values for the module log filename and the associated file handler used for the log
module_fh = None
module_logfile = ""
# ----------------------------------------------------------------------------------------------------------
def check_and_get_data(input_list: list, **pars: object) -> list:
"""Verify that all specified files are present. If not, retrieve them from MAST.
This function relies on the `AstroQuery interface to MAST
<https://astroquery.readthedocs.io/en/latest/mast/mast.html>`_
to retrieve the exposures from the ``input_list`` that are not found in the current directory. This
function calls the simplified interface in
:func:`haputils/astroquery_utils/retrieve_observation`
to get the files through AstroQuery.
Parameters
----------
input_list : list
List of one or more calibrated fits images that will be used for catalog generation.
pars : dict
Set of additional parameters for :func:`haputils/astroquery_utils/retrieve_observation`
to use in retrieving any data that was not already in the current working directory.
Returns
=======
total_input_list: list
list of full filenames
See Also
========
haputils/astroquery_utils/retrieve_observation
"""
empty_list = []
retrieve_list = [] # Actual files retrieved via astroquery and resident on disk
candidate_list = [] # File names gathered from *_asn.fits file
ipppssoot_list = [] # ipppssoot names used to avoid duplicate downloads
total_input_list = [] # Output full filename list of data on disk
# Loop over the input_list to determine if the item in the input_list is a full association file
# (*_asn.fits), a full individual image file (aka singleton, *_flt.fits), or a root name specification
# (association or singleton, ipppssoot).
for input_item in input_list:
log.info("Input item: {}".format(input_item))
indx = input_item.find("_")
# Input with a suffix (_xxx.fits)
if indx != -1:
lc_input_item = input_item.lower()
suffix = lc_input_item[indx + 1 : indx + 4]
log.info("file: {}".format(lc_input_item))
# For an association, need to open the table and read the image names as this could
# be a custom association. The assumption is this file is on local disk when specified
# in this manner (vs just the ipppssoot of the association).
# This "if" block just collects the wanted full file names.
if suffix == "asn":
candidate_list.extend(_get_asn_members(input_item))
elif suffix in ["flc", "flt", "c0m"]:
if lc_input_item not in candidate_list:
candidate_list.append(lc_input_item)
else:
log.error(
'Inappropriate file suffix: {}. Looking for "asn.fits", '
'"flc.fits", or "flt.fits".'.format(suffix)
)
return empty_list
# Input is an ipppssoot (association or singleton), nine characters by definition.
# This "else" block actually downloads the data specified as ipppssoot.
elif len(input_item) == 9:
try:
if input_item not in ipppssoot_list:
input_item = input_item.lower()
# An ipppssoot of an individual file which is part of an association cannot be
# retrieved from MAST
retrieve_list = aqutils.retrieve_observation(input_item, **pars)
# If the retrieved list is not empty, add filename(s) to the total_input_list.
# Also, update the ipppssoot_list so we do not try to download the data again. Need
# to do this since retrieve_list can be empty because (1) data cannot be acquired (error)
# or (2) data is already on disk (ok).
if retrieve_list:
total_input_list += retrieve_list
ipppssoot_list.append(input_item)
else:
# log.error('File {} cannot be retrieved from MAST.'.format(input_item))
# return(empty_list)
log.warning(
"File {} cannot be retrieved from MAST.".format(input_item)
)
log.warning(f" using pars: {pars}")
# look for already downloaded ASN and related files instead
# ASN filenames are the only ones that end in a digit
if input_item[-1].isdigit():
_asn_name = f"{input_item}_asn.fits"
if not os.path.exists(_asn_name):
_ = aqutils.retrieve_observation(
[f"{input_item}"], suffix=["ASN"], clobber=True
)
_local_files = _get_asn_members(_asn_name)
if _local_files:
log.warning(
f"Using local files instead:\n {_local_files}"
)
total_input_list.extend(_local_files)
else:
_lfiles = os.listdir()
log.error(
f"No suitable files found for input {input_item}"
)
log.error(f" in directory with files: \n {_lfiles}")
return total_input_list
except Exception:
exc_type, exc_value, exc_tb = sys.exc_info()
traceback.print_exception(exc_type, exc_value, exc_tb, file=sys.stdout)
# Only the retrieve_list files via astroquery have been put into the total_input_list thus far.
# Now check candidate_list to detect or acquire the requested files from MAST via astroquery.
for file in candidate_list:
# If the file is found on disk, add it to the total_input_list and continue
if glob.glob(file):
total_input_list.append(file)
continue
else:
log.error("File {} cannot be found on the local disk.".format(file))
return empty_list
log.info("TOTAL INPUT LIST: {}".format(total_input_list))
return total_input_list
# ----------------------------------------------------------------------------------------------------------
def _get_asn_members(asnfile):
# default ASN member type
member_suffix = "_flc.fits"
candidate_list = []
try:
asntab = Table.read(asnfile, format="fits")
except FileNotFoundError:
log.error("File {} not found.".format(asnfile))
return []
for row in asntab:
if row["MEMTYPE"].startswith("PROD"):
continue
memname = row["MEMNAME"].lower().strip()
# Need to check if the MEMNAME is a full filename or an ipppssoot
if memname.find("_") != -1:
candidate_list.append(memname)
else:
# Define suffix for all members based on what files are present
if not os.path.exists(memname + member_suffix):
member_suffix = "_flt.fits"
candidate_list.append(memname + member_suffix)
return candidate_list
# ------------------------------------------------------------------------------------------------------------
[docs]
def perform_align(
input_list,
catalog_list,
num_sources,
archive=False,
clobber=False,
debug=False,
update_hdr_wcs=False,
result=None,
runfile="temp_align.log",
print_fit_parameters=True,
print_git_info=False,
output=False,
headerlet_filenames=None,
fit_label=None,
product_type=None,
**alignment_pars,
):
"""Actual Main calling function.
This function performs ``a posteriori`` astrometric fits to the images specified in the
``input_list``. The images are fit to all the catalogs listed in the ``catalog_list`` parameter with
the results being saved and returned as an Astropy Table object. This allows the user
to select the solution that is most appropriate.
Parameters
----------
input_list : list
List of one or more IPPSSOOTs (rootnames) to align.
catalog_list : list
Set of astrometric catalogs which should be used as references for fitting the input images. A
separate fit will be performed for each catalog specified. The catalog name will also be used
as part of the output ``WCSNAME`` value for the fit determined from that catalog.
num_sources : int, None
Maximum number of **brightest sources per chip** which will be used for cross-matching and fitting.
If set to None, all sources will be used.
archive : Boolean
Retain copies of the downloaded files in the astroquery created
sub-directories?
clobber : Boolean
Download and overwrite existing local copies of input files?
debug : Boolean
Attempt to use saved sourcelists stored in pickle files if they exist, or if they do not exist, save
sourcelists in pickle files for reuse so that step 4 can be skipped for faster subsequent
debug/development runs??
update_hdr_wcs : Boolean
Write newly computed WCS information to image image headers?
result: Table
name of variable to be updated by subroutine run.
runfile : string
log file name
print_fit_parameters : Boolean
Specify whether or not to print out FIT results for each chip.
print_git_info : Boolean
Display git repository information?
output : Boolean
Should utils.astrometric_utils.create_astrometric_catalog() generate file 'ref_cat.ecsv' and should
the alignment source catalogs get written out to files?
headerlet_filenames : dictionary, optional
dictionary that maps the flt/flc.fits file name to the corresponding custom headerlet filename.
fit_label : str
String to use as a unique tag for the WCS solutions generated by these fits. This tag will be
inserted between the ``-FIT`` and the catalog name in the ``WCSNAME` keyword value; for example,
``fit_label="User"`` will result in fits to GAIAedr3 with names ending in ``-FIT_User_GAIAedr3``.
alignment_pars : dictionary or keyword args
keyword-arg parameters containing user-specified values for the parameters used in source
identification and alignment which should replace the default values found in the JSON parameter
files in ``drizzlepac.pars.hap_pars`` based on the instrument and detector.
The code will look for default values for all the parameters in the JSON parameter files using
``get_default_pars``. For example, should the user feel it would be more successful to only look
out to a radius of 25 pixels during alignment, the user could simply specify ``searchrad=25``.
Returns
-------
filtered_table : Astropy Table
Table gets updated to contain processing information and alignment results for every raw image evaluated
"""
if debug:
loglevel = logutil.logging.DEBUG
else:
loglevel = logutil.logging.INFO
# Need to ensure the logging works properly for the PyTests where each test starts with a fresh handler
global module_fh
global module_logfile
if module_fh is not None:
print("Removing old file handler for logging.")
log.removeHandler(module_fh)
module_logfile = runfile.upper()
module_fh = logutil.logging.FileHandler(runfile)
module_fh.setLevel(loglevel)
log.addHandler(module_fh)
log.setLevel(loglevel)
log.info(f"{__taskname__} Version {__version__}\n")
# 0: print git info
if print_git_info:
log.info("{} STEP 0: Display Git revision info {}".format("-" * 20, "-" * 49))
full_path = os.path.dirname(__file__)
repo_path = None
if "drizzlepac/drizzlepac" in full_path:
repo_path = full_path.split("drizzlepac/drizzlepac")[0] + "drizzlepac"
elif "hlapipeline" in full_path:
repo_path = full_path.split("drizzlepac")[0] + "drizzlepac"
else:
pass
if not os.path.exists(repo_path):
repo_path = None # protect against non-existent paths
if repo_path:
get_git_rev_info.print_rev_id(
repo_path
) # Display git repository information
else:
log.warning(
"WARNING: Unable to display Git repository revision information."
)
# Initialize key variables
filtered_table = None
# 1: Interpret input data and optional parameters
log.info("{} STEP 1: Get data {}".format("-" * 20, "-" * 66))
zero_dt = starting_dt = datetime.datetime.now()
log.info(str(starting_dt))
imglist = check_and_get_data(
input_list, archive=archive, clobber=clobber, product_type=product_type
)
log.info("SUCCESS")
log.info(f"Processing: {imglist}")
log.info(make_label("Processing time of [STEP 1]", starting_dt))
starting_dt = datetime.datetime.now()
# Get default alignment parameters if not provided by the user...
hdr0 = fits.getheader(imglist[0])
inst = hdr0.get("instrume")
if inst.lower() == "wfpc2" and "detector" not in hdr0:
det = "pc"
else:
det = hdr0.get("detector")
apars = get_default_pars(inst, det)
alignment_pars.update(apars)
alignment_pars["MAX_SOURCES_PER_CHIP"] = num_sources
# define min num of acceptable cross-matches for a good fit
apars["determine_fit_quality"]["min_xmatches"] = apars["determine_fit_quality"][
"MIN_FIT_MATCHES"
]
try:
# Instantiate AlignmentTable class with these input files
alignment_table = align_utils.AlignmentTable(
imglist, log_level=loglevel, **alignment_pars
)
if alignment_table.process_list is None:
log.warning("NO viable images to align.")
alignment_table.close()
return None
process_list = alignment_table.process_list
# Define fitting algorithm list in priority order
# The match_relative_fit algorithm must have more than one image as the first image is
# the reference for the remaining images.
fit_algorithm_list = alignment_table.get_fit_methods()
if len(process_list) == 1:
fit_algorithm_list.remove("relative")
log.info(make_label("Processing time of [STEP 2]", starting_dt))
starting_dt = datetime.datetime.now()
# 3: Build WCS for full set of input observations
log.info("{} STEP 3: Build WCS {}".format("-" * 20, "-" * 65))
# refwcs = amutils.build_reference_wcs(process_list)
log.info("SUCCESS")
log.info(make_label("Processing time of [STEP 3]", starting_dt))
starting_dt = datetime.datetime.now()
# 4: Extract catalog of observable sources from each input image
log.info("{} STEP 4: Source finding {}".format("-" * 20, "-" * 60))
if debug:
pickle_filename = "{}.source_catalog.pickle".format(process_list[0])
if os.path.exists(pickle_filename):
pickle_in = open(pickle_filename, "rb")
alignment_table.extracted_sources = pickle.load(pickle_in)
log.info(
"Using sourcelist extracted from {} generated during the last run to save time.".format(
pickle_filename
)
)
else:
alignment_table.find_alignment_sources(output=True)
pickle_out = open(pickle_filename, "wb")
pickle.dump(alignment_table.extracted_sources, pickle_out)
pickle_out.close()
log.info("Wrote {}".format(pickle_filename))
else:
alignment_table.find_alignment_sources(output=output)
for imgname in alignment_table.extracted_sources.keys():
table = alignment_table.extracted_sources[imgname]
# Get the location of the current image in the filtered table
index = np.where(alignment_table.filtered_table["imageName"] == imgname)[0][0]
# First ensure sources were found
if table is None:
log.warning("No sources found in image {}".format(imgname))
alignment_table.filtered_table[:]["status"] = 1
alignment_table.filtered_table[:]["processMsg"] = "No sources found"
log.info(make_label("Processing time of [STEP 4]", starting_dt))
alignment_table.close()
return None
# The catalog of observable sources must have at least MIN_OBSERVABLE_THRESHOLD entries to be useful
total_num_sources = 0
for chipnum in table.keys():
if table[chipnum] is not None:
total_num_sources += len(table[chipnum])
# Update filtered table with number of found sources
alignment_table.filtered_table[index]["foundSources"] = total_num_sources
# TODO: Revise this logic, if possible, to allow for a subset of the input images to
# be ignored if they have no identifiable sources.
if (
total_num_sources
< apars["determine_fit_quality"]["MIN_OBSERVABLE_THRESHOLD"]
):
log.warning(
"Not enough sources ({}) found in image {}".format(
total_num_sources, imgname
)
)
alignment_table.filtered_table[:]["status"] = 1
alignment_table.filtered_table[:][
"processMsg"
] = "Not enough sources found"
log.info(make_label("Processing time of [STEP 4]", starting_dt))
alignment_table.close()
return None
log.info("SUCCESS")
log.info(make_label("Processing time of [STEP 4]", starting_dt))
starting_dt = datetime.datetime.now()
# 5: Retrieve list of astrometric sources from database
# Convert input images to tweakwcs-compatible FITSWCSCorrector objects and
# attach source catalogs to them.
imglist = []
for group_id, image in enumerate(process_list):
img = amutils.build_wcscat(
image, group_id, alignment_table.extracted_sources[image]
)
imglist.extend(img)
# store mapping of group_id to filename/chip
group_id_dict = {}
for image in imglist:
group_id_dict[
"{}_{}".format(image.meta["rootname"], image.meta["chip"])
] = image.meta["group_id"]
best_fit_rms = -99999.0
best_fit_status_dict = {}
best_fit_qual = 5
best_num_matches = -1
best_fit_label = [None, None]
fit_info_dict = OrderedDict()
for algorithm_name in fit_algorithm_list: # loop over fit algorithm type
log.info("Applying {} fit method".format(algorithm_name))
for catalog_index, catalog_name in enumerate(
catalog_list
): # loop over astrometric catalog
log.info(
"{} STEP 5: Detect astrometric sources {}".format(
"-" * 20, "-" * 48
)
)
log.info("Astrometric Catalog: {}".format(catalog_name))
# store reference catalogs in a dictionary so that generate_astrometric_catalog() doesn't
# execute unnecessarily after it's been run once for a given astrometric catalog.
if catalog_name in alignment_table.reference_catalogs:
log.info(
"Using {} reference catalog from earlier this run.".format(
catalog_name
)
)
reference_catalog = alignment_table.reference_catalogs[catalog_name]
else:
log.info(
"Generating new reference catalog for {};"
" Storing it for potential re-use later this run.".format(
catalog_name
)
)
reference_catalog = generate_astrometric_catalog(
process_list, catalog=catalog_name, output=output
)
alignment_table.reference_catalogs[catalog_name] = reference_catalog
log.info(make_label("Processing time of [STEP 5]", starting_dt))
starting_dt = datetime.datetime.now()
if (
len(reference_catalog)
< apars["determine_fit_quality"]["MIN_CATALOG_THRESHOLD"]
):
log.warning(
"Not enough sources found in catalog {}".format(catalog_name)
)
fit_quality = 5
if catalog_index < len(catalog_list) - 1:
log.info("Try again with other catalog")
else:
# bail out if not enough sources can be found any of the astrometric catalogs
log.warning(
"ERROR! No astrometric sources found in any catalog. Exiting..."
)
alignment_table.filtered_table["status"][:] = 1
alignment_table.filtered_table["processMsg"][:] = "No astrometric sources found"
alignment_table.filtered_table["fit_qual"][:] = fit_quality
current_dt = datetime.datetime.now()
delta_dt = (current_dt - starting_dt).total_seconds()
log.info("Processing time of [STEP 5]: {} sec".format(delta_dt))
alignment_table.close()
return alignment_table
else:
log.info(
"{} Cross matching and fitting {}".format("-" * 20, "-" * 47)
)
log.info(
"{} Catalog {} matched using {} {}".format(
"-" * 18, catalog_name, algorithm_name, "-" * 18
)
)
try:
alignment_table.configure_fit()
log.debug(
"####\n# Running configure fit for AlignmentTable to use: \n"
)
log.debug([img.wcs for img in alignment_table.imglist])
log.debug("####\n")
# restore group IDs to their pristine state prior to each run.
alignment_table.reset_group_id(len(reference_catalog))
# execute the correct fitting/matching algorithm
alignment_table.imglist = alignment_table.perform_fit(
algorithm_name, catalog_name, reference_catalog
)
# determine the quality of the fit
(
fit_rms,
fit_num,
fit_quality,
filtered_table,
fit_status_dict,
) = determine_fit_quality(
alignment_table.imglist,
alignment_table.filtered_table,
(catalog_index < (len(catalog_list) - 1)),
apars,
print_fit_parameters=print_fit_parameters,
loglevel=loglevel,
runfile=runfile,
)
alignment_table.filtered_table = filtered_table
# save fit algorithm name to dictionary key "fit method" in imglist.
for imglist_ctr in range(0, len(alignment_table.imglist)):
table_fit = alignment_table.fit_dict[
(catalog_name, algorithm_name)
]
table_fit[imglist_ctr].meta["fit method"] = algorithm_name
table_fit[imglist_ctr].meta["fit quality"] = fit_quality
# populate fit_info_dict
fit_info_dict[
"{} {}".format(catalog_name, algorithm_name)
] = fit_status_dict[next(iter(fit_status_dict))]
fit_info_dict["{} {}".format(catalog_name, algorithm_name)][
"fit_qual"
] = fit_quality
# Figure out which fit solution to go with based on fit_quality value and maybe also total_rms
if fit_quality < 5:
best_matches = fit_num > best_num_matches
if (
fit_quality == 1
): # valid, non-comprimised solution with total rms < 10 mas...go with this solution.
best_fit_rms = fit_rms
best_fit_label = (catalog_name, algorithm_name)
best_num_matches = fit_num
best_fit_status_dict = fit_status_dict.copy()
best_fit_qual = fit_quality
break # break out of while loop
elif fit_quality < best_fit_qual:
# better solution found. keep looping but with the better solution as "best" for now.
log.info("Better solution found!")
best_fit_rms = fit_rms
best_num_matches = fit_num
best_fit_label = (catalog_name, algorithm_name)
best_fit_status_dict = fit_status_dict.copy()
best_fit_qual = fit_quality
elif fit_quality == best_fit_qual:
# new solution same level of fit_quality.
# Choose whichever one has the most matches and lowest total rms as "best"
# and keep looping to see if another fit is better.
if best_fit_rms >= 0.0:
if best_matches and fit_rms < best_fit_rms:
best_fit_rms = fit_rms
best_num_matches = fit_num
best_fit_label = (catalog_name, algorithm_name)
best_fit_status_dict = fit_status_dict.copy()
best_fit_qual = fit_quality
else: # new solution has worse fit_quality. discard and continue looping.
continue
except Exception:
exc_type, exc_value, exc_tb = sys.exc_info()
traceback.print_exception(
exc_type, exc_value, exc_tb, file=sys.stdout
)
log.warning(
"WARNING: Catastrophic fitting failure with catalog {} and matching "
"algorithm {}.".format(catalog_name, algorithm_name)
)
alignment_table.filtered_table["status"][:] = 1
alignment_table.filtered_table["processMsg"][
:
] = "Fitting failure"
# It may be there are additional catalogs and algorithms to try, so keep going
fit_quality = 5 # Flag this fit with the 'bad' quality value
alignment_table.filtered_table["fit_qual"][:] = fit_quality
continue
if fit_quality == 1: # break out of inner astrometric catalog loop
break
# break out of outer fit algorithm loop either with a fit_rms < 10 or a 'valid' relative fit
if fit_quality == 1 or (
best_fit_qual in [2, 3, 4] and "relative" in algorithm_name
):
break
log.info("best_fit found to be: {}".format(best_fit_label))
log.info("FIT_DICT: {}".format(alignment_table.fit_dict.keys()))
# Reset imglist to point to best solution...
alignment_table.select_fit(best_fit_label[0], best_fit_label[1])
imglist = alignment_table.selected_fit
filtered_table = alignment_table.filtered_table
# Report processing time for this step
log.info(make_label("Processing time of [STEP 5b]", starting_dt))
starting_dt = datetime.datetime.now()
# 6: Populate the filtered_table
log.info(
"{} STEP 6: Collect up information and populate the filtered table "
"{}".format("-" * 20, "-" * 20)
)
if 0 < best_fit_rms < apars["determine_fit_quality"]["MAX_FIT_RMS"]:
log.info(
"The fitting process was successful with a best fit total "
"rms of {} mas".format(best_fit_rms)
)
else:
log.info(
"The fitting process was unsuccessful with a best fit total rms "
"of {} mas".format(best_fit_rms)
)
if 0 < best_fit_rms < apars["determine_fit_quality"]["MAX_FIT_LIMIT"]:
# Update filtered table with best fit results
filtered_table["status"][:] = 0
fit_status_dict = best_fit_status_dict.copy()
for item in imglist:
imgname = item.meta["name"]
index = np.where(filtered_table["imageName"] == imgname)[0][0]
# populate self.filtered_table fields "status", "compromised" and
# "processMsg" with fit_status_dict fields "valid", "compromised"
# and "reason".
explicit_dict_key = "{},{}".format(item.meta["name"], item.meta["chip"])
if fit_status_dict[explicit_dict_key]["valid"] is True:
filtered_table[index]["status"] = 0
else:
filtered_table[index]["status"] = 1
if fit_status_dict[explicit_dict_key]["compromised"] is False:
filtered_table["compromised"] = 0
else:
filtered_table["compromised"] = 1
filtered_table[index]["processMsg"] = fit_status_dict[
explicit_dict_key
]["reason"]
filtered_table["fit_qual"][index] = item.meta["fit quality"]
log.info(make_label("Processing time of [STEP 6]", starting_dt))
starting_dt = datetime.datetime.now()
# 7: Write new fit solution to input image headers
log.info(
"{} STEP 7: Update image headers with new WCS information "
"{}".format("-" * 20, "-" * 29)
)
if (0 < best_fit_rms < 9999.0) and update_hdr_wcs:
# determine the quality of the fit
alignment_table.apply_fit(headerlet_filenames, fit_label=fit_label)
log.info("SUCCESS")
else:
log.info(" STEP SKIPPED")
log.info(make_label("Processing time of [STEP 7]", starting_dt))
log.info(
"TOTAL Processing time of {} sec".format(
(datetime.datetime.now() - zero_dt).total_seconds()
)
)
log.info(best_fit_status_dict)
log.info("-" * 104)
log.info("-" * 104)
log.info(" SUMMARY OF ALL FIT ATTEMPTS")
for item in fit_info_dict.keys():
log.info("{} {}".format(item, fit_info_dict[item]))
log.info("-" * 104)
except Exception:
exc_type, exc_value, exc_tb = sys.exc_info()
traceback.print_exception(exc_type, exc_value, exc_tb, file=sys.stdout)
finally:
# Always make sure that all file handles are closed
alignment_table.close()
# Now update the result with the filtered_table contents
if result:
result.meta = filtered_table.meta
for col in filtered_table.colnames:
result.add_column(filtered_table[col], name=col)
if filtered_table is not None:
filtered_table.pprint(max_width=-1)
return alignment_table
# ----------------------------------------------------------------------------------------------------------
def make_label(label, starting_dt):
"""Create a time-stamped label for use in log messages"""
current_dt = datetime.datetime.now()
delta_dt = (current_dt - starting_dt).total_seconds()
return "{}: {} sec".format(label, delta_dt)
# ----------------------------------------------------------------------------------------------------------
[docs]
def determine_fit_quality(
imglist,
filtered_table,
catalogs_remaining,
align_pars,
print_fit_parameters=True,
loglevel=logutil.logging.NOTSET,
runfile="temp_align.log",
):
"""Determine the quality of the fit to the data
Parameters
----------
imglist : list
output of interpret_fits. Contains sourcelist tables, newly computed WCS info, etc. for every chip of
every valid input image. This list should have been updated, in-place, with the new RMS values;
specifically,
* 'FIT_RMS': RMS of the separations between fitted image positions and reference positions
* 'TOTAL_RMS': mean of the FIT_RMS values for all observations
* 'NUM_FITS': number of images/group_id's with successful fits included in the TOTAL_RMS
These entries are added to the 'fit_info' dictionary.
filtered_table : object
Astropy Table object containing data pertaining to the associated dataset, including
the doProcess bool. It is intended this table is updated by subsequent functions for
bookkeeping purposes.
catalogs_remaining : bool
Specify whether additional catalogs remain to be fit against.
align_pars : dict
Parameters from the appropriate '_align_all.json' parameter file
print_fit_parameters : bool
Specify whether or not to print out FIT results for each chip
log_level : int, optional
The desired level of verboseness in the log statements displayed on the screen and written to the
.log file. Default value is 20, or 'info'.
runfile : string
log file name
Returns
-------
max_rms_val : float
The best Total rms determined from all of the images
num_xmatches: int
The number of stars used in matching the data
fit_quality : int
fit quality category:
* 1 = valid solution with rms < 10 mas
* 2 = Valid but compromised solution with rms < 10 mas
* 3 = Valid solution with RMS >= 10 mas
* 4 = Valid but compromised solution with RMS >= 10 mas
* 5 = Not valid solution
filtered_table : object
modified filtered_table object
fit_status_dict : dictionary
Dictionary containing the following:
* overall fit validity (Boolean)
* total (visit-level) RMS value in mas (float)
* number of matched sources (int)
* fit compromised status (Boolean)
* reason fit is considered 'compromised' (only populated if "compromised" field is "True")
"""
# Set up the log file handler and name of the log file
# If the log file handler were never set, the module_fh will be None.
# Only want to remove a file handler if there were one set in the first place.
global module_fh
global module_logfile
# if module_fh is not None and module_logfile != runfile.upper():
if module_fh is not None:
print("Removing old file handler for logging.")
log.removeHandler(module_fh)
module_logfile = runfile.upper()
module_fh = logutil.logging.FileHandler(runfile)
module_fh.setLevel(loglevel)
log.addHandler(module_fh)
log.setLevel(loglevel)
log.info("Log file: {}".format(module_logfile))
max_rms_val = 1e9
fit_status_dict = {}
xshifts = []
yshifts = []
overall_valid = True
overall_comp = False
# See whether or not 'consistency_check' should be performed.
# This will only be turned on for pipeline processing due to the potentially extreme
# variations in photometry across a visit or in a SkyCell.
do_consistency_check = align_pars["determine_fit_quality"].get(
"do_consistency_check", True
)
auto_good_rms = float(align_pars["determine_fit_quality"]["MAX_FIT_RMS"])
for item in imglist:
if item.meta["fit_info"]["status"].startswith("REFERENCE"):
# Reference will never be modified, so set shifts to (0., 0.)
# so that there are entries for all images.
xshifts.append(0.0)
yshifts.append(0.0)
continue
if not item.meta["fit_info"]["status"].startswith("FAILED"):
xshifts.append(item.meta["fit_info"]["shift"][0])
yshifts.append(item.meta["fit_info"]["shift"][1])
else:
# Fit not successful, so no shifts to collate
break
for item in imglist:
num_xmatches = 0
image_name = item.meta["name"]
chip_num = item.meta["chip"]
fit_info = item.meta["fit_info"]
fitgeom = fit_info["fitgeom"] if "fitgeom" in fit_info else "rscale"
log.debug("\n{}\n".format("-" * 40))
log.debug("FIT being evaluated for {}".format(image_name))
log.debug(fit_info)
log.debug("\n{}\n".format("-" * 40))
# Build fit_status_dict entry
dict_key = "{},{}".format(image_name, chip_num)
fit_status_dict[dict_key] = {
"valid": True,
"max_rms": max_rms_val,
"num_matches": num_xmatches,
"compromised": False,
"reason": "",
}
# Handle fitting failures (no matches found or any other failure in fit)
if fit_info["status"].startswith("FAILED"):
log.warning(
"Alignment FAILED for {} - no processing done.".format(image_name)
)
overall_valid = False
continue
if fit_info["status"].startswith("REFERENCE") or "FIT_RMS" not in fit_info:
# No fit information available for reference image
fit_status_dict[dict_key]["valid"] = False
fit_status_dict[dict_key]["compromised"] = True
fit_status_dict[dict_key]["reason"] = "No fit information"
overall_valid = False
overall_comp = True
continue
fit_rms_val = fit_info["FIT_RMS"]
max_rms_val = fit_info["TOTAL_RMS"]
# fit_rms_ra = fit_info['RMS_RA']
# fit_rms_dec = fit_info['RMS_DEC']
# rms_ratio = abs(fit_rms_ra - fit_rms_dec) / min(fit_rms_ra, fit_rms_dec)
num_xmatches = len(fit_info["ref_mag"]) # fit_info['nmatches']
fit_status_dict[dict_key]["max_rms"] = max_rms_val
fit_status_dict[dict_key]["num_matches"] = num_xmatches
if num_xmatches < align_pars["general"]["MIN_FIT_MATCHES"]:
overall_valid = False
if catalogs_remaining:
log.warning(
"Not enough cross matches found between astrometric"
" catalog and sources found in {} ()".format(image_name, num_xmatches)
)
continue
# Compute correlation between input and GAIA magnitudes
# This check will only be performed when the fit may be uncertain
# due to less than 100 matches.
ref_cat_limit = min(1000, item.meta["num_ref_catalog"])
log.info(
"MAG CHECK REF_CAT_LIMIT: {} XMATCHES: {}".format(
ref_cat_limit, num_xmatches
)
)
if num_xmatches < max(0.1 * ref_cat_limit, 10):
cross_match_check = amutils.check_mag_corr([item])[0]
log.info(
"Cross-match check: {} on {} ref sources".format(
cross_match_check, item.meta["num_ref_catalog"]
)
)
else:
cross_match_check = True
# Execute checks
nmatches_check = False
if num_xmatches >= align_pars["run_align"]["mosaic_fitgeom_list"][fitgeom] or (
num_xmatches >= 2 and fit_rms_val > 0.5
):
nmatches_check = True
radial_offset_check = False
radial_offset = (
math.sqrt(
float(fit_info["shift"][0]) ** 2 + float(fit_info["shift"][1]) ** 2
)
* item.wcs.pscale
) # radial offset in arssec
# Without the '+2', this will always fail for xmatches < 3 regardless of how small
# the offset is: 2*0.36 == 0.72 vs 0.8 + 0 [perfect alignment]
# Adding 2 allows low offset solutions with only 1 or 2 sources to pass this check.
if float(num_xmatches + 2) * 0.36 > 0.8 + (radial_offset / 10.0) ** 8:
radial_offset_check = True
large_rms_check = True
max_fit_limit = align_pars["determine_fit_quality"]["MAX_FIT_LIMIT"]
if fit_rms_val > max_fit_limit or max_rms_val > max_fit_limit:
large_rms_check = False
# fitRmsCheck = False
# if fit_rms_val < max_rms_val:
# fitRmsCheck = True
consistency_check = True
if do_consistency_check:
rms_limit = max(fit_info["TOTAL_RMS"], auto_good_rms)
if not math.sqrt(
np.std(np.asarray(xshifts)) ** 2 + np.std(np.asarray(yshifts)) ** 2
) <= (rms_limit / align_pars["determine_fit_quality"]["MAS_TO_ARCSEC"]) / (
item.wcs.pscale
): # \
# or rms_ratio > MAX_RMS_RATIO:
consistency_check = False
# Decide if fit solutions are valid based on checks
if not consistency_check: # Failed consistency check
fit_status_dict[dict_key]["valid"] = False
fit_status_dict[dict_key]["compromised"] = False
fit_status_dict[dict_key]["reason"] = "Consistency violation!"
elif not large_rms_check: # RMS value(s) too large
fit_status_dict[dict_key]["valid"] = False
fit_status_dict[dict_key]["compromised"] = False
fit_status_dict[dict_key]["reason"] = "RMS too large (>150 mas)!"
elif not radial_offset_check: # Failed radial offset check
fit_status_dict[dict_key]["valid"] = False
fit_status_dict[dict_key]["compromised"] = False
fit_status_dict[dict_key]["reason"] = "Radial offset value too large!"
elif not nmatches_check: # Too few matches
fit_status_dict[dict_key]["valid"] = False
fit_status_dict[dict_key]["compromised"] = True
fit_status_dict[dict_key]["reason"] = "Too few matches!"
elif not cross_match_check:
fit_status_dict[dict_key]["valid"] = True
fit_status_dict[dict_key]["compromised"] = True
fit_status_dict[dict_key][
"reason"
] = "Cross-match magnitudes not correlated!"
else: # all checks passed. Valid solution.
fit_status_dict[dict_key]["valid"] = True
fit_status_dict[dict_key]["compromised"] = False
fit_status_dict[dict_key]["reason"] = ""
# for now, generate overall valid and compromised values. Basically, if any of the entries for "valid" is False,
# "valid" is False, treat the whole dataset as not valid. Same goes for compromised.
if not fit_status_dict[dict_key]["valid"]:
overall_valid = False
if fit_status_dict[dict_key]["compromised"]:
overall_comp = True
log.info(
"RESULTS FOR {} Chip {}: FIT_RMS = {} mas, TOTAL_RMS = {}"
" mas, NUM = {}".format(
image_name, item.meta["chip"], fit_rms_val, max_rms_val, num_xmatches
)
)
# print fit params to screen
if print_fit_parameters:
log_info_keys = [
"status",
"fitgeom",
"eff_minobj",
"matrix",
"shift",
"center",
"proper_rot",
"proper",
"<rot>",
"<scale>",
"skew",
"rmse",
"mae",
"nmatches",
"FIT_RMS",
"TOTAL_RMS",
"NUM_FITS",
"RMS_RA",
"RMS_DEC",
"catalog",
]
log.info("{} FIT PARAMETERS {}".format("~" * 35, "~" * 34))
log.info("image: {}".format(image_name))
log.info("chip: {}".format(item.meta["chip"]))
log.info("group_id: {}".format(item.meta["group_id"]))
for tweakwcs_info_key in log_info_keys:
log.info(
"{} : {}".format(tweakwcs_info_key, fit_info[tweakwcs_info_key])
)
log.info("~" * 84)
log.info(
"nmatches_check: {} radial_offset_check: {}"
" large_rms_check: {},"
" consistency_check: {}".format(
nmatches_check,
radial_offset_check,
large_rms_check,
consistency_check,
)
)
# determine which fit quality category this latest fit falls into
if overall_valid is False:
fit_quality = 5
log.info("FIT SOLUTION REJECTED")
filtered_table["status"][:] = 1
for ctr in range(0, len(filtered_table)):
imgname = filtered_table[ctr]["imageName"] + ",1"
if imgname in fit_status_dict:
filtered_table[ctr]["processMsg"] = fit_status_dict[imgname]["reason"]
else:
filtered_table[ctr]["processMsg"] = "Not a valid exposure"
else:
for ctr in range(0, len(filtered_table)):
filtered_table[ctr]["processMsg"] = ""
if overall_comp is False and max_rms_val < auto_good_rms:
log.info("Valid solution with RMS < {} mas found!".format(auto_good_rms))
fit_quality = 1
elif overall_comp is True and max_rms_val < auto_good_rms:
log.info(
"Valid but compromised solution with RMS < {} mas found!".format(
auto_good_rms
)
)
fit_quality = 2
elif overall_comp is False and 1000.0 >= max_rms_val >= auto_good_rms:
log.info("Valid solution with RMS >= {} mas found!".format(auto_good_rms))
fit_quality = 3
else:
log.info(
"Valid but compromised solution with RMS >= {} mas found!".format(
auto_good_rms
)
)
fit_quality = 4
if print_fit_parameters:
for item in imglist:
log.info(
fit_status_dict["{},{}".format(item.meta["name"], item.meta["chip"])]
)
if max_rms_val > auto_good_rms:
log.info(
"Total fit RMS value = {} mas greater than the maximum threshold value {}.".format(
max_rms_val, auto_good_rms
)
)
if not overall_valid:
log.info("The fit solution for some or all of the images is not valid.")
if max_rms_val > auto_good_rms or not overall_valid:
log.info("Trying again with the next catalog, method, or geometry depending upon the current fitting cycle.")
else:
log.info("Fit calculations successful.")
return max_rms_val, num_xmatches, fit_quality, filtered_table, fit_status_dict
# ----------------------------------------------------------------------------------------------------------------------
def determine_fit_quality_mvm_interface(
imglist,
filtered_table,
catalogs_remaining,
ref_catalog_length,
align_pars,
print_fit_parameters=True,
loglevel=logutil.logging.NOTSET,
runfile="temp_align.log",
):
"""Simple interface to allow MVM code to use determine_fit_quality().
Parameters
----------
imglist : list
output of interpret_fits. Contains sourcelist tables, newly computed WCS info, etc. for every chip of
every valid input image. This list should have been updated, in-place, with the new RMS values;
specifically,
* 'FIT_RMS': RMS of the separations between fitted image positions and reference positions
* 'TOTAL_RMS': mean of the FIT_RMS values for all observations
* 'NUM_FITS': number of images/group_id's with successful fits included in the TOTAL_RMS
These entries are added to the 'fit_info' dictionary.
filtered_table : object
Astropy Table object containing data pertaining to the associated dataset, including
the doProcess bool. It is intended this table is updated by subsequent functions for
bookkeeping purposes.
catalogs_remaining : bool
Specify whether additional catalogs remain to be fit against.
align_pars : dict
Parameters from the appropriate '_align_all.json' parameter file
print_fit_parameters : bool
Specify whether or not to print out FIT results for each chip
log_level : int, optional
The desired level of verboseness in the log statements displayed on the screen and written to the
.log file. Default value is 20, or 'info'.
runfile : string
log file name
Returns
-------
is_good_fit : bool
Is the fit acceptable (Is fit_quality value in GOOD_FIT_QUALITY_VALUES)?
max_rms_val : float
The best Total rms determined from all of the images
num_xmatches: int
The number of stars used in matching the data
fit_quality : int
fit quality category:
* 1 = valid solution with rms < 10 mas
* 2 = Valid but compromised solution with rms < 10 mas
* 3 = Valid solution with RMS >= 10 mas
* 4 = Valid but compromised solution with RMS >= 10 mas
* 5 = Not valid solution
filtered_table : object
modified filtered_table object
fit_status_dict : dictionary
Dictionary containing the following:
* overall fit validity (Boolean)
* total (visit-level) RMS value in mas (float)
* number of matched sources (int)
* fit compromised status (Boolean)
* reason fit is considered 'compromised' (only populated if "compromised" field is "True")
"""
log.setLevel(loglevel)
# Check if num_ref_catalog is in imglist...if not, add it.
for ctr in range(0, len(imglist)):
if "num_ref_catalog" not in imglist[ctr].meta.keys():
imglist[ctr].meta["num_ref_catalog"] = ref_catalog_length
# Execute determine_fit_quality
(
fit_rms,
fit_num,
fit_quality,
filtered_table,
fit_status_dict,
) = determine_fit_quality(
imglist,
filtered_table,
catalogs_remaining,
align_pars,
print_fit_parameters=print_fit_parameters,
loglevel=loglevel,
runfile=runfile,
)
# Determine if the fit quality is acceptable
if fit_quality in align_pars["determine_fit_quality"]["GOOD_FIT_QUALITY_VALUES"]:
is_good_fit = True
else:
is_good_fit = False
return is_good_fit, fit_rms, fit_num, fit_quality, filtered_table, fit_status_dict
# ----------------------------------------------------------------------------------------------------------------------
def generate_astrometric_catalog(imglist, **pars):
"""Generates a catalog of all sources from an existing astrometric catalog are
in or near the FOVs of the images in the input list.
Parameters
----------
imglist : list
List of one or more calibrated fits images that will be used for catalog generation.
output : str, optional
If specified as part of input pars dict, it provides the name of the output catalog file
overwrite : bool, optional
If specified as part of the input pars dict, it specifies whether or not to overwrite any
catalog file already present with the same path/filename as specified in `output`.
Returns
=======
ref_table : object
Astropy Table object of the catalog
"""
# generate catalog
temp_pars = pars.copy()
if pars["output"] is True:
pars["output"] = "ref_cat.ecsv"
else:
pars["output"] = None
overwrite = pars.get("clobber", True)
out_catalog = amutils.create_astrometric_catalog(imglist, **pars)
pars = temp_pars.copy()
# if the catalog has contents, write the catalog to ascii text file
if len(out_catalog) > 0 and pars["output"]:
catalog_filename = "refcatalog.cat"
out_catalog.write(
catalog_filename, format="ascii.fast_commented_header", overwrite=overwrite
)
log.info("Wrote reference catalog {}.".format(catalog_filename))
return out_catalog
# ----------------------------------------------------------------------------------------------------------------------
def get_default_pars(
instrument,
detector,
step="alignment",
condition=["filter_basic"],
hap_pipeline_name="svm",
):
step_list = config_utils.step_title_list
if step not in step_list:
log.error("{} not valid! Needs to be one of: {}".format(step, step_list))
raise ValueError
full_cfg_index, pars_dir = config_utils.read_index(
instrument, detector, hap_pipeline_name=hap_pipeline_name
)
par_class = config_utils.step_name_list[step_list.index(step)]
apars = par_class(
full_cfg_index[step], condition, hap_pipeline_name, pars_dir, step, True, None
)
return apars.outpars