"""Utilities to support creation of Hubble Advanced Pipeline(HAP) products.
"""
import sys
import os
import shutil
import numpy as np
from astropy.io import fits as fits
from astropy.io.fits import Column
from astropy.time import Time
from stsci.tools import logutil
from stsci.tools.fileutil import countExtn
from stwcs import wcsutil
from .cell_utils import SkyFootprint, get_sky_cells
LEVEL_DEFS = {1: 'single exposure product', 2: 'filter product', 3: 'total detection product'}
HAPCOLNAME = 'HAPEXPNAME'
PHOT_KEYWORDS = ['PHOTMODE', 'PHOTFLAM', 'PHOTFNU', 'PHOTZPT', 'PHOTPLAM', 'PHOTBW']
__taskname__ = 'processing_utils'
log = logutil.create_logger(__name__, level=logutil.logging.NOTSET, stream=sys.stdout)
def get_rules_file(product, rules_type="", rules_root=None):
"""Copies default HAP rules file to local directory.
This function enforces the naming convention for rules files
provided as part of this package; namely,
<instrument name>_{<rules type>_}_header_hap.rules
where <instrument name> is the lower-case value of the INSTRUME keyword
for the exposure, and, optionally, <rules type> is the lower-case
value of the ``rules_type`` parameter.
The default rules file will be found in the package's installation directory,
renamed for the input exposure and copied into the local working directory.
This will allow AstroDrizzle to find the rules file when drizzling the exposure.
Parameters
----------
product : str
Filename of the input exposure that the rules file applies to
rules_type : str, optional
Specifies the type of processing being performed on the input
exposure. Valid values: blank/empty string (''), 'SVM' or 'svm'
for SVM processing (default) and 'MVM' or 'mvm' for MVM processing.
rules_root : str, optional
Name of output product to use as the rootname for the output rules file.
Specifying this filename indicates that the default rules for combining
multiple inputs should be used.
If None, output rules file will be derived from the product filename and
indicates that the rules file should be specific to a single input.
Returns
-------
new_rules_name : str
filename of the new rules file copied into the current working directory
"""
# Interpret rules_type to guard against a variety of possible inputs
# and to insure all values are converted to lower-case for use in
# filenames
rules_type = "" if rules_type == None else rules_type.strip(' ').lower()
hdu, closefits = _process_input(product)
phdu = hdu[0].header
instrument = phdu['instrume']
# Append '_single' to rules_type if single image product
if rules_root is None:
if rules_type == "":
rules_type = 'single'
else:
rules_type = '_'.join([rules_type, 'single'])
rootname = '_'.join(product.split("_")[:-1])
else:
rootname = '_'.join(rules_root.split("_")[:-1])
# Create rules name prefix here
# The trailing rstrip guards against a blank rules_type, which
# is the default for SVM processing.
rules_prefix = '_'.join([instrument.lower(), rules_type]).rstrip('_')
code_dir = os.path.abspath(__file__)
base_dir = os.path.dirname(os.path.dirname(code_dir))
def_rules_name = "{}_header_hap.rules".format(rules_prefix)
new_rules_name = "{}_header_hap.rules".format(rootname)
rules_filename = os.path.join(base_dir, 'pars', def_rules_name)
new_rules_filename = os.path.join(os.getcwd(), new_rules_name)
log.debug(f'Copying \n\t{rules_filename} \nto \n\t{new_rules_filename}')
if new_rules_name not in os.listdir('.'):
shutil.copy(rules_filename, new_rules_filename)
return new_rules_name
def build_logname(input_filename, process_type='svm'):
"""Build the log filename based on the input filename"""
suffix = '.{}'.format(input_filename.split('.')[-1])
if isinstance(input_filename, str): # input file is a poller file -- easy case
logname = input_filename.replace(suffix, '.log')
else:
logname = '{}_process.log'.format(process_type)
return logname
def get_acs_filters(image, delimiter=';', all=False):
hdu, closefits = _process_input(image)
filters = [kw[1] for kw in hdu[0].header['filter?'].items()]
acs_filters = []
for f in filters:
if ('clear' not in f.lower() and not all) or all:
acs_filters.append(f)
if not acs_filters:
acs_filters = ['clear']
acs_filters = delimiter.join(acs_filters)
if closefits:
hdu.close()
return acs_filters
def get_wfpc2_filters(image, delimiter=';', all=False):
hdu, closefits = _process_input(image)
filters = [kw[1] for kw in hdu[0].header['filtnam?'].items()]
wfpc2_filters = []
for f in filters:
if (f.strip() != '' and not all) or all:
wfpc2_filters.append(f)
if not wfpc2_filters:
wfpc2_filters = ['clear']
wfpc2_filters = delimiter.join(wfpc2_filters)
wfpc2_filters = wfpc2_filters.rstrip(delimiter)
if closefits:
hdu.close()
return wfpc2_filters
def update_hdrtab(image, level, total_obj_list, input_exposures):
"""Build HAP entry table extension for product"""
# Convert input_exposure filenames into HAP product filenames
name_col = []
orig_tab = image['hdrtab'].data
# get the name of the product so it can be selected from
# the total_obj_list for updating
update_filename = image[0].header['filename']
for tot_obj in total_obj_list:
# Get the HAPProduct object for the input image to be updated
# The '.find_member()' method looks for exposure, filter and
# total level product.
img_obj = tot_obj.find_member(update_filename)
if img_obj is None:
# Didn't find the input image in this total_obj instance,
# try another...
continue
# if tot_obj.drizzle_filename != update_filename:
# continue
# Only for the total_obj_list entry that matches the input image
# should we build the list of new rootnames
for row in orig_tab:
rootname = str(row['rootname'])
# The rootname is ipppssoot, but the expname is only contains ipppssoo,
# so remove the last character for the comparisons
rootname = rootname[0:-1]
for expname in input_exposures:
if rootname in expname:
# Convert input exposure names into HAP names
for exposure in tot_obj.edp_list:
if rootname in exposure.full_filename:
name_col.append(exposure.product_basename)
break
hdrtab_cols = orig_tab.columns
if name_col:
# define new column with HAP expname and ensure the column
# will accommodate a string of at least 51 characters which handles an ACS image
max_len = max(max([len(name) for name in name_col]), 51)
hapcol = Column(array=np.array(name_col, dtype=np.str_), name=HAPCOLNAME, format='{}A'.format(max_len))
newcol = fits.ColDefs([hapcol])
hdrtab_cols += newcol
# define new extension
haphdu = fits.BinTableHDU.from_columns(hdrtab_cols)
haphdu.header['extname'] = 'HDRTAB'
haphdu.header['extver'] = 1
# remove old extension
del image['hdrtab']
# replace with new extension
image.append(haphdu)
[docs]
def compute_sregion(image, extname='SCI'):
"""Compute the S_REGION keyword for a given WCS.
Parameters
-----------
image : Astropy io.fits HDUList object
Image to update with the S_REGION keyword in each of the SCI extensions.
extname : str, optional
EXTNAME value for extension containing the WCS(s) to be updated
"""
# This function could, conceivably, be called directly...
hdu, closefits = _process_input(image)
# Find all extensions to be updated
numext = countExtn(hdu, extname=extname)
for extnum in range(1, numext + 1):
sciext = (extname, extnum)
if 'd001data' not in hdu[0].header:
sregion_str = 'POLYGON ICRS '
# Working with FLT/FLC file, so simply use
# the array corners directly
extwcs = wcsutil.HSTWCS(hdu, ext=sciext)
footprint = extwcs.calc_footprint(center=True)
for corner in footprint:
sregion_str += '{} {} '.format(corner[0], corner[1])
# Close the polygon
sregion_str += '{} {} '.format(footprint[0][0], footprint[0][1])
else:
if hdu[(extname, extnum)].data.min() == 0 and hdu[(extname, extnum)].data.max() == 0:
continue
# Working with a drizzled image, so we need to
# get all the corners from each of the input files
footprint = find_footprint(hdu, extname=extname, extnum=extnum)
sregion_str = ''
for region in footprint.corners:
# S_REGION string should contain a separate POLYGON
# for each region or chip in the SCI array
sregion_str += 'POLYGON ICRS '
for corner in region:
sregion_str += '{} {} '.format(corner[0], corner[1])
hdu[sciext].header['s_region'] = sregion_str
# close file if opened by this functions
if closefits:
hdu.close()
def add_skycell_to_header(image_filename, extname='SCI'):
"""Determines the skycells for which the image falls within and adds the
information to the science header. The SKYCELL value for IPPPSSOOT and SVM
products may be different as the current computation is based upon the WCSNAME
of the input images.
Parameters
----------
image : str
Image filename to update with the skycell keyword in each extname extension.
extname : str, optional
EXTNAME value for extension containing the header to be updated
Returns
-------
Nothing; image header has been updated unless keyword already exists.
"""
# Open image and determine whether to consequently close it
try:
hdu, closefits = _process_input(image_filename)
except:
log.error(f"Could not open {image_filename} during add_skycell_to_header. Exiting.")
# Find all extensions to be updated
numext = countExtn(hdu, extname=extname)
for extver in range(1, numext + 1):
sciext = (extname, extver)
skycells = get_sky_cells([image_filename])
if skycells:
shortened_skycells = [x[8:] for x in list(skycells.keys())] # remove 'skycell_' from the keys
skycell_string = '; '.join(shortened_skycells) # join the keys into a string
# inserts keyword after s_region if it exists, otherwise updates value
if 'skycell' not in hdu[sciext].header:
log.debug("Adding skycell keyword.")
hdu[sciext].header.insert(
"s_region",
("skycell", skycell_string, "Skycell(s) that this image occupies"),
after=True,
)
else:
log.debug("Updating skycell keyword.")
hdu[sciext].header["skycell"]= (skycell_string, 'Skycell(s) that this image occupies')
else:
log.error(f"No skycells found for {image_filename}.")
# close file if opened by this functions
if closefits:
hdu.close()
def find_footprint(hdu, extname='SCI', extnum=1):
"""Extract the footprints from each input file
Determine the composite of all the input chip's corners
as the footprint for the entire set of overlapping images
that went into creating this drizzled image.
Parameters
===========
hdu : str or `~fits.HDUList`
Filename or HDUList for a drizzled image
extname : str, optional
Name of science array extension (extname value)
extnum : int, optional
EXTVER value for science array extension
Returns
========
footprint : ndarray
Array of RA/Dec for the 4 corners that comprise the
footprint on the sky for this mosaic. Values were
determined from northern-most corner counter-clockwise
to the rest.
"""
# extract WCS from this product
meta_wcs = wcsutil.HSTWCS(hdu, ext=(extname, extnum))
# create SkyFootprint object for all input_files to determine footprint
footprint = SkyFootprint(meta_wcs=meta_wcs)
# create mask of all input chips as they overlap on the product WCS
footprint.extract_mask(hdu.filename())
# Now, find the corners from this mask
footprint.find_corners()
return footprint
def interpret_sregion(image, extname='SCI'):
"""Interpret the S_REGION keyword as a list of RA/Dec points"""
# This function could, conceivably, be called directly...
hdu, closefits = _process_input(image)
# Find all extensions to be updated
numext = countExtn(hdu, extname=extname)
sregions = []
for extnum in range(1, numext + 1):
sregions.append(fits.getval(image, 's_region', ext=(extname, extnum)))
coords = []
for region_str in sregions:
regions = region_str.split('POLYGON ICRS ')
regions = [val for val in regions if val.strip(' ') != '']
for region in regions:
radec_str = np.array(region.strip(' ').split(' '), dtype=np.float64)
coords.append(radec_str.reshape((radec_str.shape[0] // 2, 2)))
return coords
def _process_input(input):
"""Verify that input is an Astropy HDUList object opened in 'update' mode
Parameters
----------
input : str or object
Filename of input image or HDUList object for image to be processed
Returns
--------
hdu : object
Astropy HDUList object of input opened in
closefits : bool
Boolean which indicates whether input should be closed when processing has been completed.
"""
# Process input product
if isinstance(input, str):
hdu = fits.open(input, mode='update')
closefits = True
else:
hdu = input
closefits = False
# Insure that input has been opened in update mode to allow for new values to be saved to headers
if hdu._file.mode != 'update':
filename = hdu.filename()
if filename:
hdu.close()
hdu = fits.open(filename, mode='update')
closefits = True
else:
log.error("Input object could not be opened in 'update' mode.")
raise ValueError
return hdu, closefits
def append_trl_file(trlfile, drizfile, clean=True):
""" Append log file to already existing log or trailer file.
Parameters
-----------
clean : bool
Remove the ``drizfile`` or not when finished appending it to ``trlfile``
"""
if not os.path.exists(drizfile):
return
# Open already existing trailer file for appending
ftrl = open(trlfile, 'a')
# Open astrodrizzle trailer file
fdriz = open(drizfile)
# Read in drizzle comments
_dlines = fdriz.readlines()
# Append them to CALWF3 trailer file
ftrl.writelines(_dlines)
# Close all files
ftrl.close()
fdriz.close()
if clean:
# Now, clean up astrodrizzle trailer file
os.remove(drizfile)
def make_section_str(str="", width=60, symbol='-'):
"""Generate string for starting/ending sections of log messages"""
strlen = len(str)
dash_len = width - strlen if strlen < width else 0
side_dash = symbol * (dash_len // 2)
section_str = "{}{}{}".format(side_dash, str, side_dash)
return section_str
def find_flt_keyword (hdu, keyword, extname="SCI", extver=1):
"""Look for the FITS keyword in the Primary and specified extension header
This routine deliberately does not use astropy.io.fits getval() as it
should not be used in application code due to its self-documented inefficiency.
This routine is only a STOPGAP to be used as a backstop to handle the
issue when refine_product_headers() does not successfully update the drizzle
product headers.
Parameters
----------
hdu : HDUList
List of Header Data Unit objects
keyword : str
Keyword of interest
extname : str
Named extension to check in case the keyword is not found in the Primary
e.g., (extname, extver) ("SCI", 2)
extver : int
Specified extver of an extname to check in case the keyword is not found in the Primary
e.g., (extname, extver) ("SCI", 2)
Returns
-------
value : float
"""
# Look for the keyword in the Primary header
try:
value = hdu[0].header[keyword]
log.info("Read keyword {} from the Primary header.".format(keyword))
# Oops. Look for the keyword in the extension
except KeyError:
try:
sciext = (extname, extver)
value = hdu[sciext].header[keyword]
log.info("Read keyword from the {} header.".format(sciext))
except KeyError as err:
log.error("Keyword {} does not exist in the Primary or {} extension.".format(keyword, sciext))
raise
# Now ensure the returned variable is of the proper type
try:
value = float(value)
log.info("Keyword {}: {}.".format(keyword, value))
except (ValueError, TypeError) as err:
log.error("The value of keyword, {}, cannot be cast as a floating point value.".format(keyword))
raise
return value
def _find_open_files():
"""Useful utility function to identify any open file handles during processing."""
import psutil
cwd = os.getcwd()
log.info("Checking for open files in {}".format(cwd))
open_files = []
for proc in psutil.process_iter():
try:
for pfile, pid in proc.open_files():
if os.path.dirname(pfile) == cwd:
open_files.append(pfile)
except psutil.AccessDenied:
continue
except FileNotFoundError:
continue
return open_files