Source code for drizzlepac.align
#!/usr/bin/env python
"""This script is a modernized implementation of tweakreg.
"""
import datetime
import sys
import math
import os
import pickle
import logging
from collections import OrderedDict
import traceback
import numpy as np
from astropy.io import fits
from stsci.tools import logutil
from .haputils import astrometric_utils as amutils
from .haputils import get_git_rev_info
from .haputils import align_utils
from .haputils import config_utils
from . import __version__
__taskname__ = "align"
MSG_DATEFMT = "%Y%j%H%M%S"
SPLUNK_MSG_FORMAT = "%(asctime)s %(levelname)s src=%(name)s- %(message)s"
log = logging.getLogger(__name__)
[docs]
def perform_align(
imglist,
catalog_list,
num_sources,
archive=False,
clobber=False,
debug=False,
update_hdr_wcs=False,
result=None,
runfile="temp_align.log",
print_fit_parameters=True,
print_git_info=False,
output=False,
headerlet_filenames=None,
fit_label=None,
product_type=None,
**alignment_pars,
):
"""Actual Main calling function.
This function performs ``a posteriori`` astrometric fits to the images specified in the
``imglist``. The images are fit to all the catalogs listed in the ``catalog_list`` parameter with
the results being saved and returned as an Astropy Table object. This allows the user
to select the solution that is most appropriate.
Parameters
----------
imglist : list
List of one or more IPPSSOOTs (rootnames) to align.
catalog_list : list
Set of astrometric catalogs which should be used as references for fitting the input images. A
separate fit will be performed for each catalog specified. The catalog name will also be used
as part of the output ``WCSNAME`` value for the fit determined from that catalog.
num_sources : int, None
Maximum number of **brightest sources per chip** which will be used for cross-matching and fitting.
If set to None, all sources will be used.
archive : Boolean
Retain copies of the downloaded files in the astroquery created
sub-directories?
clobber : Boolean
Download and overwrite existing local copies of input files?
debug : Boolean
Attempt to use saved sourcelists stored in pickle files if they exist, or if they do not exist, save
sourcelists in pickle files for reuse so that step 4 can be skipped for faster subsequent
debug/development runs??
update_hdr_wcs : Boolean
Write newly computed WCS information to image image headers?
result: Table
name of variable to be updated by subroutine run.
runfile : string
log file name
print_fit_parameters : Boolean
Specify whether or not to print out FIT results for each chip.
print_git_info : Boolean
Display git repository information?
output : Boolean
Should utils.astrometric_utils.create_astrometric_catalog() generate file 'ref_cat.ecsv' and should
the alignment source catalogs get written out to files?
headerlet_filenames : dictionary, optional
dictionary that maps the flt/flc.fits file name to the corresponding custom headerlet filename.
fit_label : str
String to use as a unique tag for the WCS solutions generated by these fits. This tag will be
inserted between the ``-FIT`` and the catalog name in the ``WCSNAME` keyword value; for example,
``fit_label="User"`` will result in fits to GAIAedr3 with names ending in ``-FIT_User_GAIAedr3``.
alignment_pars : dictionary or keyword args
keyword-arg parameters containing user-specified values for the parameters used in source
identification and alignment which should replace the default values found in the JSON parameter
files in ``drizzlepac.pars.hap_pars`` based on the instrument and detector.
The code will look for default values for all the parameters in the JSON parameter files using
``get_default_pars``. For example, should the user feel it would be more successful to only look
out to a radius of 25 pixels during alignment, the user could simply specify ``searchrad=25``.
Returns
-------
filtered_table : Astropy Table
Table gets updated to contain processing information and alignment results for every raw image evaluated
"""
if debug:
loglevel = logutil.logging.DEBUG
else:
loglevel = logutil.logging.INFO
log.debug(f"{__taskname__} Version {__version__}\n")
# 0: print git info
if print_git_info:
log.debug("{} STEP 0: Display Git revision info {}".format("-" * 20, "-" * 49))
full_path = os.path.dirname(__file__)
repo_path = None
if "drizzlepac/drizzlepac" in full_path:
repo_path = full_path.split("drizzlepac/drizzlepac")[0] + "drizzlepac"
elif "hlapipeline" in full_path:
repo_path = full_path.split("drizzlepac")[0] + "drizzlepac"
else:
pass
if not os.path.exists(repo_path):
repo_path = None # protect against non-existent paths
if repo_path:
get_git_rev_info.print_rev_id(
repo_path
) # Display git repository information
else:
log.warning(
"WARNING: Unable to display Git repository revision information."
)
# Initialize key variables
filtered_table = None
# 1: Interpret input data and optional parameters
log.info("{} STEP 1: Get data {}".format("-" * 20, "-" * 66))
zero_dt = starting_dt = datetime.datetime.now()
log.info(str(starting_dt))
log.info("SUCCESS")
log.info(f"Processing: {imglist}")
log.debug(make_label("Processing time of [STEP 1]", starting_dt))
starting_dt = datetime.datetime.now()
# Get default alignment parameters if not provided by the user...
hdr0 = fits.getheader(imglist[0])
inst = hdr0.get("instrume")
if inst.lower() == "wfpc2" and "detector" not in hdr0:
det = "pc"
else:
det = hdr0.get("detector")
apars = get_default_pars(inst, det)
alignment_pars.update(apars)
alignment_pars["MAX_SOURCES_PER_CHIP"] = num_sources
# define min num of acceptable cross-matches for a good fit
apars["determine_fit_quality"]["min_xmatches"] = apars["determine_fit_quality"][
"MIN_FIT_MATCHES"
]
try:
# Instantiate AlignmentTable class with these input files
alignment_table = align_utils.AlignmentTable(
imglist, log_level=loglevel, **alignment_pars
)
if alignment_table.process_list is None:
log.warning("NO viable images to align.")
alignment_table.close()
return None
process_list = alignment_table.process_list
# Define fitting algorithm list in priority order
# The match_relative_fit algorithm must have more than one image as the first image is
# the reference for the remaining images.
fit_algorithm_list = alignment_table.get_fit_methods()
if len(process_list) == 1:
fit_algorithm_list.remove("relative")
log.debug(make_label("Processing time of [STEP 2]", starting_dt))
starting_dt = datetime.datetime.now()
# 3: Build WCS for full set of input observations
log.info("{} STEP 3: Build WCS {}".format("-" * 20, "-" * 65))
# refwcs = amutils.build_reference_wcs(process_list)
log.debug("SUCCESS")
log.debug(make_label("Processing time of [STEP 3]", starting_dt))
starting_dt = datetime.datetime.now()
# 4: Extract catalog of observable sources from each input image
log.info("{} STEP 4: Source finding {}".format("-" * 20, "-" * 60))
if debug:
pickle_filename = "{}.source_catalog.pickle".format(process_list[0])
if os.path.exists(pickle_filename):
pickle_in = open(pickle_filename, "rb")
alignment_table.extracted_sources = pickle.load(pickle_in)
log.debug(
"Using sourcelist extracted from {} generated during the last run to save time.".format(
pickle_filename
)
)
else:
alignment_table.find_alignment_sources(output=True)
pickle_out = open(pickle_filename, "wb")
pickle.dump(alignment_table.extracted_sources, pickle_out)
pickle_out.close()
log.debug("Wrote {}".format(pickle_filename))
else:
alignment_table.find_alignment_sources(output=output)
for imgname in alignment_table.extracted_sources.keys():
table = alignment_table.extracted_sources[imgname]
# Get the location of the current image in the filtered table
index = np.where(alignment_table.filtered_table["imageName"] == imgname)[0][0]
# First ensure sources were found
if table is None:
log.warning("No sources found in image {}".format(imgname))
alignment_table.filtered_table[:]["status"] = 1
alignment_table.filtered_table[:]["processMsg"] = "No sources found"
log.debug(make_label("Processing time of [STEP 4]", starting_dt))
alignment_table.close()
return None
# The catalog of observable sources must have at least MIN_OBSERVABLE_THRESHOLD entries to be useful
total_num_sources = 0
for chipnum in table.keys():
if table[chipnum] is not None:
total_num_sources += len(table[chipnum])
# Update filtered table with number of found sources
alignment_table.filtered_table[index]["foundSources"] = total_num_sources
# TODO: Revise this logic, if possible, to allow for a subset of the input images to
# be ignored if they have no identifiable sources.
if (
total_num_sources
< apars["determine_fit_quality"]["MIN_OBSERVABLE_THRESHOLD"]
):
log.warning(
"Not enough sources ({}) found in image {}".format(
total_num_sources, imgname
)
)
alignment_table.filtered_table[:]["status"] = 1
alignment_table.filtered_table[:][
"processMsg"
] = "Not enough sources found"
log.debug(make_label("Processing time of [STEP 4]", starting_dt))
alignment_table.close()
return None
log.debug("SUCCESS")
log.debug(make_label("Processing time of [STEP 4]", starting_dt))
starting_dt = datetime.datetime.now()
# 5: Retrieve list of astrometric sources from database
# Convert input images to tweakwcs-compatible FITSWCSCorrector objects and
# attach source catalogs to them.
imglist = []
for group_id, image in enumerate(process_list):
img = amutils.build_wcscat(
image, group_id, alignment_table.extracted_sources[image]
)
imglist.extend(img)
# store mapping of group_id to filename/chip
group_id_dict = {}
for image in imglist:
group_id_dict[
"{}_{}".format(image.meta["rootname"], image.meta["chip"])
] = image.meta["group_id"]
best_fit_rms = -99999.0
best_fit_status_dict = {}
best_fit_qual = 5
best_num_matches = -1
best_fit_label = [None, None]
fit_info_dict = OrderedDict()
for algorithm_name in fit_algorithm_list: # loop over fit algorithm type
log.debug("Applying {} fit method".format(algorithm_name))
for catalog_index, catalog_name in enumerate(
catalog_list
): # loop over astrometric catalog
log.info(
"{} STEP 5: Detect astrometric sources {}".format(
"-" * 20, "-" * 48
)
)
log.debug("Astrometric Catalog: {}".format(catalog_name))
# store reference catalogs in a dictionary so that generate_astrometric_catalog() doesn't
# execute unnecessarily after it's been run once for a given astrometric catalog.
if catalog_name in alignment_table.reference_catalogs:
log.debug(
"Using {} reference catalog from earlier this run.".format(
catalog_name
)
)
reference_catalog = alignment_table.reference_catalogs[catalog_name]
else:
log.debug(
"Generating new reference catalog for {};"
" Storing it for potential re-use later this run.".format(
catalog_name
)
)
reference_catalog = generate_astrometric_catalog(
process_list, catalog=catalog_name, output=output
)
alignment_table.reference_catalogs[catalog_name] = reference_catalog
log.debug(make_label("Processing time of [STEP 5]", starting_dt))
starting_dt = datetime.datetime.now()
if (
len(reference_catalog)
< apars["determine_fit_quality"]["MIN_CATALOG_THRESHOLD"]
):
log.warning(
"Not enough sources found in catalog {}".format(catalog_name)
)
fit_quality = 5
if catalog_index < len(catalog_list) - 1:
log.debug("Try again with other catalog")
else:
# bail out if not enough sources can be found any of the astrometric catalogs
log.warning(
"ERROR! No astrometric sources found in any catalog. Exiting..."
)
alignment_table.filtered_table["status"][:] = 1
alignment_table.filtered_table["processMsg"][:] = "No astrometric sources found"
alignment_table.filtered_table["fit_qual"][:] = fit_quality
current_dt = datetime.datetime.now()
delta_dt = (current_dt - starting_dt).total_seconds()
log.debug("Processing time of [STEP 5]: {} sec".format(delta_dt))
alignment_table.close()
return alignment_table
else:
log.debug(
"{} Cross matching and fitting {}".format("-" * 20, "-" * 47)
)
log.debug(
"{} Catalog {} matched using {} {}".format(
"-" * 18, catalog_name, algorithm_name, "-" * 18
)
)
try:
alignment_table.configure_fit()
log.debug(
"####\n# Running configure fit for AlignmentTable to use: \n"
)
log.debug([img.wcs for img in alignment_table.imglist])
log.debug("####\n")
# restore group IDs to their pristine state prior to each run.
alignment_table.reset_group_id(len(reference_catalog))
# execute the correct fitting/matching algorithm
alignment_table.imglist = alignment_table.perform_fit(
algorithm_name, catalog_name, reference_catalog
)
# determine the quality of the fit
(
fit_rms,
fit_num,
fit_quality,
filtered_table,
fit_status_dict,
) = determine_fit_quality(
alignment_table.imglist,
alignment_table.filtered_table,
(catalog_index < (len(catalog_list) - 1)),
apars,
print_fit_parameters=print_fit_parameters,
loglevel=loglevel,
runfile=runfile,
)
alignment_table.filtered_table = filtered_table
# save fit algorithm name to dictionary key "fit method" in imglist.
for imglist_ctr in range(0, len(alignment_table.imglist)):
table_fit = alignment_table.fit_dict[
(catalog_name, algorithm_name)
]
table_fit[imglist_ctr].meta["fit method"] = algorithm_name
table_fit[imglist_ctr].meta["fit quality"] = fit_quality
# populate fit_info_dict
fit_info_dict[
"{} {}".format(catalog_name, algorithm_name)
] = fit_status_dict[next(iter(fit_status_dict))]
fit_info_dict["{} {}".format(catalog_name, algorithm_name)][
"fit_qual"
] = fit_quality
# Figure out which fit solution to go with based on fit_quality value and maybe also total_rms
if fit_quality < 5:
best_matches = fit_num > best_num_matches
if (
fit_quality == 1
): # valid, non-comprimised solution with total rms < 10 mas...go with this solution.
best_fit_rms = fit_rms
best_fit_label = (catalog_name, algorithm_name)
best_num_matches = fit_num
best_fit_status_dict = fit_status_dict.copy()
best_fit_qual = fit_quality
break # break out of while loop
elif fit_quality < best_fit_qual:
# better solution found. keep looping but with the better solution as "best" for now.
log.debug("Better solution found!")
best_fit_rms = fit_rms
best_num_matches = fit_num
best_fit_label = (catalog_name, algorithm_name)
best_fit_status_dict = fit_status_dict.copy()
best_fit_qual = fit_quality
elif fit_quality == best_fit_qual:
# new solution same level of fit_quality.
# Choose whichever one has the most matches and lowest total rms as "best"
# and keep looping to see if another fit is better.
if best_fit_rms >= 0.0:
if best_matches and fit_rms < best_fit_rms:
best_fit_rms = fit_rms
best_num_matches = fit_num
best_fit_label = (catalog_name, algorithm_name)
best_fit_status_dict = fit_status_dict.copy()
best_fit_qual = fit_quality
else: # new solution has worse fit_quality. discard and continue looping.
continue
except Exception:
exc_type, exc_value, exc_tb = sys.exc_info()
traceback.print_exception(
exc_type, exc_value, exc_tb, file=sys.stdout
)
log.warning(
"WARNING: Catastrophic fitting failure with catalog {} and matching "
"algorithm {}.".format(catalog_name, algorithm_name)
)
alignment_table.filtered_table["status"][:] = 1
alignment_table.filtered_table["processMsg"][
:
] = "Fitting failure"
# It may be there are additional catalogs and algorithms to try, so keep going
fit_quality = 5 # Flag this fit with the 'bad' quality value
alignment_table.filtered_table["fit_qual"][:] = fit_quality
continue
if fit_quality == 1: # break out of inner astrometric catalog loop
break
# break out of outer fit algorithm loop either with a fit_rms < 10 or a 'valid' relative fit
if fit_quality == 1 or (
best_fit_qual in [2, 3, 4] and "relative" in algorithm_name
):
break
log.debug("best_fit found to be: {}".format(best_fit_label))
log.debug("FIT_DICT: {}".format(alignment_table.fit_dict.keys()))
# Reset imglist to point to best solution...
alignment_table.select_fit(best_fit_label[0], best_fit_label[1])
imglist = alignment_table.selected_fit
filtered_table = alignment_table.filtered_table
# Report processing time for this step
log.debug(make_label("Processing time of [STEP 5b]", starting_dt))
starting_dt = datetime.datetime.now()
# 6: Populate the filtered_table
log.info(
"{} STEP 6: Collect up information and populate the filtered table "
"{}".format("-" * 20, "-" * 20)
)
if 0 < best_fit_rms < apars["determine_fit_quality"]["MAX_FIT_RMS"]:
log.debug(
"The fitting process was successful with a best fit total "
"rms of {} mas".format(best_fit_rms)
)
else:
log.debug(
"The fitting process was unsuccessful with a best fit total rms "
"of {} mas".format(best_fit_rms)
)
if 0 < best_fit_rms < apars["determine_fit_quality"]["MAX_FIT_LIMIT"]:
# Update filtered table with best fit results
filtered_table["status"][:] = 0
fit_status_dict = best_fit_status_dict.copy()
for item in imglist:
imgname = item.meta["name"]
index = np.where(filtered_table["imageName"] == imgname)[0][0]
# populate self.filtered_table fields "status", "compromised" and
# "processMsg" with fit_status_dict fields "valid", "compromised"
# and "reason".
explicit_dict_key = "{},{}".format(item.meta["name"], item.meta["chip"])
if fit_status_dict[explicit_dict_key]["valid"] is True:
filtered_table[index]["status"] = 0
else:
filtered_table[index]["status"] = 1
if fit_status_dict[explicit_dict_key]["compromised"] is False:
filtered_table["compromised"] = 0
else:
filtered_table["compromised"] = 1
filtered_table[index]["processMsg"] = fit_status_dict[
explicit_dict_key
]["reason"]
filtered_table["fit_qual"][index] = item.meta["fit quality"]
log.debug(make_label("Processing time of [STEP 6]", starting_dt))
starting_dt = datetime.datetime.now()
# 7: Write new fit solution to input image headers
log.info(
"{} STEP 7: Update image headers with new WCS information "
"{}".format("-" * 20, "-" * 29)
)
if (0 < best_fit_rms < 9999.0) and update_hdr_wcs:
# determine the quality of the fit
alignment_table.apply_fit(headerlet_filenames, fit_label=fit_label)
log.debug("SUCCESS")
else:
log.debug(" STEP SKIPPED")
log.debug(make_label("Processing time of [STEP 7]", starting_dt))
log.info(
"TOTAL Processing time of {} sec".format(
(datetime.datetime.now() - zero_dt).total_seconds()
)
)
log.debug(best_fit_status_dict)
log.debug("-" * 104)
log.debug("-" * 104)
log.debug(" SUMMARY OF ALL FIT ATTEMPTS")
for item in fit_info_dict.keys():
log.debug("{} {}".format(item, fit_info_dict[item]))
log.debug("-" * 104)
except Exception:
exc_type, exc_value, exc_tb = sys.exc_info()
traceback.print_exception(exc_type, exc_value, exc_tb, file=sys.stdout)
finally:
# Always make sure that all file handles are closed
alignment_table.close()
# Now update the result with the filtered_table contents
if result:
result.meta = filtered_table.meta
for col in filtered_table.colnames:
result.add_column(filtered_table[col], name=col)
if filtered_table is not None:
filtered_table.pprint(max_width=-1)
return alignment_table
# ----------------------------------------------------------------------------------------------------------
def make_label(label, starting_dt):
"""Create a time-stamped label for use in log messages"""
current_dt = datetime.datetime.now()
delta_dt = (current_dt - starting_dt).total_seconds()
return "{}: {} sec".format(label, delta_dt)
# ----------------------------------------------------------------------------------------------------------
[docs]
def determine_fit_quality(
imglist,
filtered_table,
catalogs_remaining,
align_pars,
print_fit_parameters=True,
loglevel=logutil.logging.NOTSET,
runfile="temp_align.log",
):
"""Determine the quality of the fit to the data
Parameters
----------
imglist : list
output of interpret_fits. Contains sourcelist tables, newly computed WCS info, etc. for every chip of
every valid input image. This list should have been updated, in-place, with the new RMS values;
specifically,
* 'FIT_RMS': RMS of the separations between fitted image positions and reference positions
* 'TOTAL_RMS': mean of the FIT_RMS values for all observations
* 'NUM_FITS': number of images/group_id's with successful fits included in the TOTAL_RMS
These entries are added to the 'fit_info' dictionary.
filtered_table : object
Astropy Table object containing data pertaining to the associated dataset, including
the doProcess bool. It is intended this table is updated by subsequent functions for
bookkeeping purposes.
catalogs_remaining : bool
Specify whether additional catalogs remain to be fit against.
align_pars : dict
Parameters from the appropriate '_align_all.json' parameter file
print_fit_parameters : bool
Specify whether or not to print out FIT results for each chip
log_level : int, optional
The desired level of verboseness in the log statements displayed on the screen and written to the
.log file. Default value is 20, or 'info'.
runfile : string
log file name
Returns
-------
max_rms_val : float
The best Total rms determined from all of the images
num_xmatches: int
The number of stars used in matching the data
fit_quality : int
fit quality category:
* 1 = valid solution with rms < 10 mas
* 2 = Valid but compromised solution with rms < 10 mas
* 3 = Valid solution with RMS >= 10 mas
* 4 = Valid but compromised solution with RMS >= 10 mas
* 5 = Not valid solution
filtered_table : object
modified filtered_table object
fit_status_dict : dictionary
Dictionary containing the following:
* overall fit validity (Boolean)
* total (visit-level) RMS value in mas (float)
* number of matched sources (int)
* fit compromised status (Boolean)
* reason fit is considered 'compromised' (only populated if "compromised" field is "True")
"""
max_rms_val = 1e9
fit_status_dict = {}
xshifts = []
yshifts = []
overall_valid = True
overall_comp = False
# See whether or not 'consistency_check' should be performed.
# This will only be turned on for pipeline processing due to the potentially extreme
# variations in photometry across a visit or in a SkyCell.
do_consistency_check = align_pars["determine_fit_quality"].get(
"do_consistency_check", True
)
auto_good_rms = float(align_pars["determine_fit_quality"]["MAX_FIT_RMS"])
for item in imglist:
if item.meta["fit_info"]["status"].startswith("REFERENCE"):
# Reference will never be modified, so set shifts to (0., 0.)
# so that there are entries for all images.
xshifts.append(0.0)
yshifts.append(0.0)
continue
if not item.meta["fit_info"]["status"].startswith("FAILED"):
xshifts.append(item.meta["fit_info"]["shift"][0])
yshifts.append(item.meta["fit_info"]["shift"][1])
else:
# Fit not successful, so no shifts to collate
break
for item in imglist:
num_xmatches = 0
image_name = item.meta["name"]
chip_num = item.meta["chip"]
fit_info = item.meta["fit_info"]
fitgeom = fit_info["fitgeom"] if "fitgeom" in fit_info else "rscale"
log.debug("\n{}\n".format("-" * 40))
log.debug("FIT being evaluated for {}".format(image_name))
log.debug(fit_info)
log.debug("\n{}\n".format("-" * 40))
# Build fit_status_dict entry
dict_key = "{},{}".format(image_name, chip_num)
fit_status_dict[dict_key] = {
"valid": True,
"max_rms": max_rms_val,
"num_matches": num_xmatches,
"compromised": False,
"reason": "",
}
# Handle fitting failures (no matches found or any other failure in fit)
if fit_info["status"].startswith("FAILED"):
log.warning(
"Alignment FAILED for {} - no processing done.".format(image_name)
)
overall_valid = False
continue
if fit_info["status"].startswith("REFERENCE") or "FIT_RMS" not in fit_info:
# No fit information available for reference image
fit_status_dict[dict_key]["valid"] = False
fit_status_dict[dict_key]["compromised"] = True
fit_status_dict[dict_key]["reason"] = "No fit information"
overall_valid = False
overall_comp = True
continue
fit_rms_val = fit_info["FIT_RMS"]
max_rms_val = fit_info["TOTAL_RMS"]
# fit_rms_ra = fit_info['RMS_RA']
# fit_rms_dec = fit_info['RMS_DEC']
# rms_ratio = abs(fit_rms_ra - fit_rms_dec) / min(fit_rms_ra, fit_rms_dec)
num_xmatches = len(fit_info["ref_mag"]) # fit_info['nmatches']
fit_status_dict[dict_key]["max_rms"] = max_rms_val
fit_status_dict[dict_key]["num_matches"] = num_xmatches
if num_xmatches < align_pars["general"]["MIN_FIT_MATCHES"]:
overall_valid = False
if catalogs_remaining:
log.warning(
"Not enough cross matches found between astrometric"
" catalog and sources found in {} ()".format(image_name, num_xmatches)
)
continue
# Compute correlation between input and GAIA magnitudes
# This check will only be performed when the fit may be uncertain
# due to less than 100 matches.
ref_cat_limit = min(1000, item.meta["num_ref_catalog"])
log.info(
"MAG CHECK REF_CAT_LIMIT: {} XMATCHES: {}".format(
ref_cat_limit, num_xmatches
)
)
if num_xmatches < max(0.1 * ref_cat_limit, 10):
cross_match_check = amutils.check_mag_corr([item])[0]
log.info(
"Cross-match check: {} on {} ref sources".format(
cross_match_check, item.meta["num_ref_catalog"]
)
)
else:
cross_match_check = True
# Execute checks
nmatches_check = False
if item.meta['fit method']!='relative':
required_nmatches = align_pars["run_align"]["mosaic_fitgeom_list"][fitgeom]
else:
required_nmatches = align_pars["run_align"]["mosaic_relgeom_list"][fitgeom]
# If the number of matches is more than requirement set in configuration (json) files
# OR the fit RMS is higher --> fit is compromised below
if num_xmatches >= required_nmatches or (num_xmatches >= 2 and fit_rms_val > 0.5):
nmatches_check = True
radial_offset_check = False
radial_offset = (
math.sqrt(
float(fit_info["shift"][0]) ** 2 + float(fit_info["shift"][1]) ** 2
)
* item.wcs.pscale
) # radial offset in arssec
# Without the '+2', this will always fail for xmatches < 3 regardless of how small
# the offset is: 2*0.36 == 0.72 vs 0.8 + 0 [perfect alignment]
# Adding 2 allows low offset solutions with only 1 or 2 sources to pass this check.
if float(num_xmatches + 2) * 0.36 > 0.8 + (radial_offset / 10.0) ** 8:
radial_offset_check = True
large_rms_check = True
max_fit_limit = align_pars["determine_fit_quality"]["MAX_FIT_LIMIT"]
if fit_rms_val > max_fit_limit or max_rms_val > max_fit_limit:
large_rms_check = False
# fitRmsCheck = False
# if fit_rms_val < max_rms_val:
# fitRmsCheck = True
consistency_check = True
if do_consistency_check:
rms_limit = max(fit_info["TOTAL_RMS"], auto_good_rms)
if not math.sqrt(
np.std(np.asarray(xshifts)) ** 2 + np.std(np.asarray(yshifts)) ** 2
) <= (rms_limit / align_pars["determine_fit_quality"]["MAS_TO_ARCSEC"]) / (
item.wcs.pscale
): # \
# or rms_ratio > MAX_RMS_RATIO:
consistency_check = False
# Decide if fit solutions are valid based on checks
if not consistency_check: # Failed consistency check
fit_status_dict[dict_key]["valid"] = False
fit_status_dict[dict_key]["compromised"] = False
fit_status_dict[dict_key]["reason"] = "Consistency violation!"
elif not large_rms_check: # RMS value(s) too large
fit_status_dict[dict_key]["valid"] = False
fit_status_dict[dict_key]["compromised"] = False
fit_status_dict[dict_key]["reason"] = "RMS too large (>150 mas)!"
elif not radial_offset_check: # Failed radial offset check
fit_status_dict[dict_key]["valid"] = False
fit_status_dict[dict_key]["compromised"] = False
fit_status_dict[dict_key]["reason"] = "Radial offset value too large!"
elif not nmatches_check: # Too few matches
fit_status_dict[dict_key]["valid"] = False
fit_status_dict[dict_key]["compromised"] = True
fit_status_dict[dict_key]["reason"] = "Too few matches!"
elif not cross_match_check:
fit_status_dict[dict_key]["valid"] = True
fit_status_dict[dict_key]["compromised"] = True
fit_status_dict[dict_key][
"reason"
] = "Cross-match magnitudes not correlated!"
else: # all checks passed. Valid solution.
fit_status_dict[dict_key]["valid"] = True
fit_status_dict[dict_key]["compromised"] = False
fit_status_dict[dict_key]["reason"] = ""
# for now, generate overall valid and compromised values. Basically, if any of the entries for "valid" is False,
# "valid" is False, treat the whole dataset as not valid. Same goes for compromised.
if not fit_status_dict[dict_key]["valid"]:
overall_valid = False
if fit_status_dict[dict_key]["compromised"]:
overall_comp = True
log.info(
"RESULTS FOR {} Chip {}: FIT_RMS = {} mas, TOTAL_RMS = {}"
" mas, NUM = {}".format(
image_name, item.meta["chip"], fit_rms_val, max_rms_val, num_xmatches
)
)
# print fit params to screen
if print_fit_parameters:
log_info_keys = [
"status",
"fitgeom",
"eff_minobj",
"matrix",
"shift",
"center",
"proper_rot",
"proper",
"<rot>",
"<scale>",
"skew",
"rmse",
"mae",
"nmatches",
"FIT_RMS",
"TOTAL_RMS",
"NUM_FITS",
"RMS_RA",
"RMS_DEC",
"catalog",
]
log.info("{} FIT PARAMETERS {}".format("~" * 35, "~" * 34))
log.info("image: {}".format(image_name))
log.info("chip: {}".format(item.meta["chip"]))
log.info("group_id: {}".format(item.meta["group_id"]))
for tweakwcs_info_key in log_info_keys:
log.info(
"{} : {}".format(tweakwcs_info_key, fit_info[tweakwcs_info_key])
)
log.info("~" * 84)
log.info(
"nmatches_check: {} radial_offset_check: {}"
" large_rms_check: {},"
" consistency_check: {}".format(
nmatches_check,
radial_offset_check,
large_rms_check,
consistency_check,
)
)
# determine which fit quality category this latest fit falls into
if overall_valid is False:
fit_quality = 5
log.info("FIT SOLUTION REJECTED")
filtered_table["status"][:] = 1
for ctr in range(0, len(filtered_table)):
imgname = filtered_table[ctr]["imageName"] + ",1"
if imgname in fit_status_dict:
filtered_table[ctr]["processMsg"] = fit_status_dict[imgname]["reason"]
else:
filtered_table[ctr]["processMsg"] = "Not a valid exposure"
else:
for ctr in range(0, len(filtered_table)):
filtered_table[ctr]["processMsg"] = ""
if overall_comp is False and max_rms_val < auto_good_rms:
log.info("Valid solution with RMS < {} mas found!".format(auto_good_rms))
fit_quality = 1
elif overall_comp is True and max_rms_val < auto_good_rms:
log.info(
"Valid but compromised solution with RMS < {} mas found!".format(
auto_good_rms
)
)
fit_quality = 2
elif overall_comp is False and 1000.0 >= max_rms_val >= auto_good_rms:
log.info("Valid solution with RMS >= {} mas found!".format(auto_good_rms))
fit_quality = 3
else:
log.info(
"Valid but compromised solution with RMS >= {} mas found!".format(
auto_good_rms
)
)
fit_quality = 4
if print_fit_parameters:
for item in imglist:
log.info(
fit_status_dict["{},{}".format(item.meta["name"], item.meta["chip"])]
)
if max_rms_val > auto_good_rms:
log.info(
"Total fit RMS value = {} mas greater than the maximum threshold value {}.".format(
max_rms_val, auto_good_rms
)
)
if not overall_valid:
log.info("The fit solution for some or all of the images is not valid.")
if max_rms_val > auto_good_rms or not overall_valid:
log.info("Trying again with the next catalog, method, or geometry depending upon the current fitting cycle.")
else:
log.info("Fit calculations successful.")
return max_rms_val, num_xmatches, fit_quality, filtered_table, fit_status_dict
# ----------------------------------------------------------------------------------------------------------------------
def determine_fit_quality_mvm_interface(
imglist,
filtered_table,
catalogs_remaining,
ref_catalog_length,
align_pars,
print_fit_parameters=True,
loglevel=logutil.logging.NOTSET,
runfile="temp_align.log",
):
"""Simple interface to allow MVM code to use determine_fit_quality().
Parameters
----------
imglist : list
output of interpret_fits. Contains sourcelist tables, newly computed WCS info, etc. for every chip of
every valid input image. This list should have been updated, in-place, with the new RMS values;
specifically,
* 'FIT_RMS': RMS of the separations between fitted image positions and reference positions
* 'TOTAL_RMS': mean of the FIT_RMS values for all observations
* 'NUM_FITS': number of images/group_id's with successful fits included in the TOTAL_RMS
These entries are added to the 'fit_info' dictionary.
filtered_table : object
Astropy Table object containing data pertaining to the associated dataset, including
the doProcess bool. It is intended this table is updated by subsequent functions for
bookkeeping purposes.
catalogs_remaining : bool
Specify whether additional catalogs remain to be fit against.
align_pars : dict
Parameters from the appropriate '_align_all.json' parameter file
print_fit_parameters : bool
Specify whether or not to print out FIT results for each chip
log_level : int, optional
The desired level of verboseness in the log statements displayed on the screen and written to the
.log file. Default value is 20, or 'info'.
runfile : string
log file name
Returns
-------
is_good_fit : bool
Is the fit acceptable (Is fit_quality value in GOOD_FIT_QUALITY_VALUES)?
max_rms_val : float
The best Total rms determined from all of the images
num_xmatches: int
The number of stars used in matching the data
fit_quality : int
fit quality category:
* 1 = valid solution with rms < 10 mas
* 2 = Valid but compromised solution with rms < 10 mas
* 3 = Valid solution with RMS >= 10 mas
* 4 = Valid but compromised solution with RMS >= 10 mas
* 5 = Not valid solution
filtered_table : object
modified filtered_table object
fit_status_dict : dictionary
Dictionary containing the following:
* overall fit validity (Boolean)
* total (visit-level) RMS value in mas (float)
* number of matched sources (int)
* fit compromised status (Boolean)
* reason fit is considered 'compromised' (only populated if "compromised" field is "True")
"""
log.setLevel(loglevel)
# Check if num_ref_catalog is in imglist...if not, add it.
for ctr in range(0, len(imglist)):
if "num_ref_catalog" not in imglist[ctr].meta.keys():
imglist[ctr].meta["num_ref_catalog"] = ref_catalog_length
# Execute determine_fit_quality
(
fit_rms,
fit_num,
fit_quality,
filtered_table,
fit_status_dict,
) = determine_fit_quality(
imglist,
filtered_table,
catalogs_remaining,
align_pars,
print_fit_parameters=print_fit_parameters,
loglevel=loglevel,
runfile=runfile,
)
# Determine if the fit quality is acceptable
if fit_quality in align_pars["determine_fit_quality"]["GOOD_FIT_QUALITY_VALUES"]:
is_good_fit = True
else:
is_good_fit = False
return is_good_fit, fit_rms, fit_num, fit_quality, filtered_table, fit_status_dict
# ----------------------------------------------------------------------------------------------------------------------
def generate_astrometric_catalog(imglist, **pars):
"""Generates a catalog of all sources from an existing astrometric catalog are
in or near the FOVs of the images in the input list.
Parameters
----------
imglist : list
List of one or more calibrated fits images that will be used for catalog generation.
output : str, optional
If specified as part of input pars dict, it provides the name of the output catalog file
overwrite : bool, optional
If specified as part of the input pars dict, it specifies whether or not to overwrite any
catalog file already present with the same path/filename as specified in `output`.
Returns
=======
ref_table : object
Astropy Table object of the catalog
"""
# generate catalog
temp_pars = pars.copy()
if pars["output"] is True:
pars["output"] = "ref_cat.ecsv"
else:
pars["output"] = None
overwrite = pars.get("clobber", True)
out_catalog = amutils.create_astrometric_catalog(imglist, **pars)
pars = temp_pars.copy()
# if the catalog has contents, write the catalog to ascii text file
if len(out_catalog) > 0 and pars["output"]:
catalog_filename = "refcatalog.cat"
out_catalog.write(
catalog_filename, format="ascii.fast_commented_header", overwrite=overwrite
)
log.debug("Wrote reference catalog {}.".format(catalog_filename))
return out_catalog
# ----------------------------------------------------------------------------------------------------------------------
def get_default_pars(
instrument,
detector,
step="alignment",
condition=["filter_basic"],
hap_pipeline_name="svm",
):
step_list = config_utils.step_title_list
if step not in step_list:
log.error("{} not valid! Needs to be one of: {}".format(step, step_list))
raise ValueError
full_cfg_index, pars_dir = config_utils.read_index(
instrument, detector, hap_pipeline_name=hap_pipeline_name
)
par_class = config_utils.step_name_list[step_list.index(step)]
apars = par_class(
full_cfg_index[step], condition, hap_pipeline_name, pars_dir, step, True, None
)
return apars.outpars