"""Utilities to interpret the pipeline poller obset information and generate product filenames
The function, interpret_obset_input, parses the file generated by the pipeline
poller, and produces a tree listing of the output products. The function,
parse_obset_tree, converts the tree into product catagories.
"""
from collections import OrderedDict
import os
import shutil
import sys
import numpy as np
import copy
from sklearn.cluster import KMeans
from stsci.tools import logutil
from astropy.io import fits
from astropy.io import ascii
from astropy.io.fits import getheader
from astropy.table import Table, Column
from drizzlepac import util
from drizzlepac.haputils.product import (
ExposureProduct,
FilterProduct,
TotalProduct,
GrismExposureProduct,
)
from drizzlepac.haputils.product import SkyCellProduct, SkyCellExposure
from . import analyze
from . import astroquery_utils as aqutils
from . import processing_utils
from . import cell_utils
# Define information/formatted strings to be included in output dict
SEP_STR = "single exposure product {:02d}"
FP_STR = "filter product {:02d}"
TDP_STR = "total detection product {:02d}"
# Define the mapping between the first character of the filename and the associated instrument
INSTRUMENT_DICT = {
"i": "WFC3",
"j": "ACS",
"o": "STIS",
"u": "WFPC2",
"x": "FOC",
"w": "WFPC",
}
POLLER_COLNAMES = [
"filename",
"proposal_id",
"program_id",
"obset_id",
"exptime",
"filters",
"detector",
"pathname",
]
POLLER_DTYPE = ("str", "int", "str", "str", "float", "object", "str", "str")
MVM_POLLER_COLNAMES = [
"filename",
"proposal_id",
"program_id",
"obset_id",
"exptime",
"filters",
"detector",
"skycell_id",
"skycell_new",
"pathname",
]
MVM_POLLER_DTYPE = (
"str",
"int",
"str",
"str",
"float",
"object",
"str",
"str",
"str",
"str",
)
BOOL_STR_DICT = {
"TRUE": True,
"FALSE": False,
"T": True,
"F": False,
"1": True,
"0": False,
}
EXP_LABELS = {2: "long", 1: "med", 0: "short", None: "all"}
EXP_LIMITS = [0, 15, 120]
SUPPORTED_EXP_METHODS = {"kmeans", "hard", "all"}
__taskname__ = "poller_utils"
MSG_DATEFMT = "%Y%j%H%M%S"
SPLUNK_MSG_FORMAT = "%(asctime)s %(levelname)s src=%(name)s- %(message)s"
log = logutil.create_logger(
__name__,
level=logutil.logging.NOTSET,
stream=sys.stdout,
format=SPLUNK_MSG_FORMAT,
datefmt=MSG_DATEFMT,
)
# -----------------------------------------------------------------------------
# Single Visit Processing Functions
#
# Translate the database query on an obset into actionable lists of filenames
[docs]
def build_obset_tree(obset_table):
"""Convert obset table into a tree listing all products to be created."""
# Each product will consist of the appropriate string as the key and
# a dict of 'info' and 'files' information
# Start interpreting the obset table
obset_tree = {}
for row in obset_table:
# Get some basic information from the first row - no need to check
# for multiple instruments as data from different instruments will
# not be combined.
det = row["detector"]
orig_filt = row["filters"]
# Potentially need to manipulate the 'filters' string for instruments
# with two filter wheels
filt = determine_filter_name(orig_filt)
row["filters"] = filt
row_info, filename = create_row_info(row)
# Initial population of the obset tree for this detector
if det not in obset_tree:
obset_tree[det] = {}
obset_tree[det][filt] = [(row_info, filename)]
else:
det_node = obset_tree[det]
if filt not in det_node:
det_node[filt] = [(row_info, filename)]
else:
det_node[filt].append((row_info, filename))
return obset_tree
def create_row_info(row):
"""Build info string for a row from the obset table"""
info_list = [
str(row["proposal_id"]),
"{}".format(row["obset_id"]),
row["instrument"],
row["detector"],
row["aperture"],
row["filename"][: row["filename"].find("_")],
row["filters"],
]
return " ".join(map(str.upper, info_list)), row["filename"]
# -----------------------------------------------------------------------------
# Multi-Visit Processing Functions
#
# Translate the database query on an obset into actionable lists of filenames
def build_mvm_tree(obset_table):
"""Convert obset table into a tree listing all products to be created."""
# Each product will consist of the appropriate string as the key and
# a dict of 'info' and 'files' information
# Start interpreting the obset table
obset_tree = {}
for row in obset_table:
# Get some basic information from the first row - no need to check
# for multiple instruments as data from different instruments will
# not be combined.
det = row["detector"]
orig_filt = row["filters"]
exp_layer = row["exp_layer"]
year_layer = row["year_layer"]
skycell = row["skycell_id"]
# Potentially need to manipulate the 'filters' string for instruments
# with two filter wheels
filt = determine_filter_name(orig_filt)
row["filters"] = filt
# Define the key for the sky cell layer based on skycell, filter,
# then optionally exposure times and/or observation date.
layer = (skycell, filt, exp_layer, year_layer)
# Insure that no matter how 'exp_layer' gets turned off, it gets recognized.
if exp_layer in ["ALL", "all", "None", "", " ", None]:
# Turn off use of 'exp_layer' as an output layer for the sky cell.
layer = None
row_info, filename = create_mvm_info(row)
# Define key for combined sky cell layer (all exposure times in one layer)
row_info_all = row_info.split(" ")
row_info_all[4] = "ALL"
row_info_all = " ".join(row_info_all)
layer_all = (skycell, filt, "all", year_layer)
# Initial population of the obset tree for this detector
if det not in obset_tree:
obset_tree[det] = {}
if layer:
obset_tree[det][layer] = [(row_info, filename)]
obset_tree[det][layer_all] = [(row_info_all, filename)]
else:
det_node = obset_tree[det]
if layer:
if layer not in det_node:
det_node[layer] = [(row_info, filename)]
else:
det_node[layer].append((row_info, filename))
if layer_all not in det_node:
det_node[layer_all] = [(row_info_all, filename)]
else:
det_node[layer_all].append((row_info_all, filename))
return obset_tree
def create_mvm_info(row):
"""Build info string for a row from the obset table"""
info_list = [
str(row["skycell_id"]),
row["instrument"],
row["detector"],
row["filters"],
str(row["exp_layer"]),
str(row["year_layer"]),
str(row["skycell_new"]),
]
# info_list = [str(row['proposal_id']), "{}".format(row['obset_id']), row['instrument'],
# row['detector'], row['filters'], row['exp_layer'], row['year_layer']]
return " ".join(map(str.upper, info_list)), row["filename"]
def parse_mvm_tree(det_tree, all_mvm_exposures, log_level):
"""Convert tree into products for sky cell layers
Tree generated by `~create_mvm_info` will always have the following
levels:
* detector
* filters
* layer
Each exposure will have an entry dict with keys 'info' and 'filename'.
Products created will be:
* a single sky cell layer product per 'detector/filter/layer' combination
"""
log.setLevel(log_level)
# Initialize products dict
obset_products = {}
# For each level, define a product, starting with the detector used...
prev_det_indx = 0
det_indx = 0
filt_indx = 0
filt_indx_inc = 1
tdp_list = []
# Setup products for each detector used
#
# det_tree = {'UVIS': {'f200lp': [('skycell_p1234_x01y01 WFC3 UVIS IACS01T9Q F200LP 1', 'iacs01t9q_flt.fits'),
# ('skycell_p1234_x01y01 WFC3 UVIS IACS01TBQ F200LP 1', 'iacs01tbq_flt.fits')]},
# 'IR': {'f160w': [('skycell_p1234_x01y01 WFC3 IR IACS01T4Q F160W 1', 'iacs01t4q_flt.fits')]}}
for filt_tree in det_tree.values():
det_indx += 1
# Find all filters used...
# filt_tree = {'f200lp': [('skycell_p1234_x01y01 WFC3 UVIS IACS01T9Q F200LP 1', 'iacs01t9q_flt.fits'),
# ('skycell_p1234_x01y01 WFC3 UVIS IACS01TBQ F200LP 1', 'iacs01tbq_flt.fits')]}
for filter_files in filt_tree.values():
# Use this to create and populate filter product dictionary entry
fprod = FP_STR.format(filt_indx)
obset_products[fprod] = {"info": "", "files": []}
# Populate single exposure dictionary entry now as well
# filename = ('skycell_p1234_x01y01 WFC3 UVIS IACS01T9Q F200LP 1', 'iacs01t9q_flt.fits')
#
filter_members, filetype = select_common_filetype(filter_files)
for filename, is_member in zip(filter_files, filter_members):
if det_indx != prev_det_indx:
prev_det_indx = det_indx
# If this input exposure does not have the same filetype (FLT/FLC) as the
# rest of the images in this layer, ignore it.
if not is_member:
continue
# Generate the full product dictionary information string:
#
# [row['proposal_id'], row['obset_id'], row['instrument'],
# row['detector'], row['filters'], row['exp_layer'], row['year_layer']]
#
prod_info = (filename[0] + " " + filetype).lower()
#
# mvm prod_info = 'skycell_p1234_x01y01 wfc3 uvis f200lp all 2009 1 drz'
# prod_list is:
# skycell, instrument, detector, filters, exposure_layer, year_layer, old_or_new, and filetype (CTE corrected or not).
prod_list = prod_info.split(" ")
multi_scale = prod_list[2].upper() in ["IR", "PC"]
pscale = "fine" if not multi_scale else "coarse"
prod_info += " {:s}".format(pscale)
if prod_list[5].strip() != "":
layer = (prod_list[3], pscale, prod_list[4], prod_list[5])
else:
layer = (prod_list[3], pscale, prod_list[4])
ftype = prod_list[-1]
cellid = prod_list[0].split("-")[1]
xindx = cellid.index("x")
prop_id = cellid[1:xindx] # projection cell number
obset_id = cellid[xindx:] # x and y skycell coordniates
# Create a single sky cell exposure product object
# __init__(self, prop_id, obset_id, instrument, detector, filename, filters, filetype, log_level)
sep_obj = SkyCellExposure(
prop_id,
obset_id,
prod_list[1],
prod_list[2],
"empty_aperture", #aperture
filename[1],
layer,
ftype,
log_level,
)
# set flag to record whether this is a 'new' exposure or one that
# has already been aligned to a layer already:
# 1 means it is 'new' to the layer
# 0 means it is being reprocessed
#
sep_obj.new_process = int(prod_list[-2])
# Set up the filter product dictionary and create a filter product object
# The SkyCellProduct is the MVM equivalent of the SVM FilterProduct. Each SkyCellProduct
# is comprised of a list of exposures in the visit which were taken with the same filter.
# Initialize `info` key for this filter product dictionary
if not obset_products[fprod]["info"]:
obset_products[fprod]["info"] = prod_info
# Create a filter product object for this instrument/detector
# FilterProduct(prop_id, obset_id, instrument, detector,
# filename, filters, filetype, log_level)
filt_obj = SkyCellProduct(
str(0),
str(0),
prod_list[1],
prod_list[2],
"empty_aperture", #aperture
prod_list[0],
layer,
ftype,
log_level,
)
# Append exposure object to the list of exposure objects for this specific filter product object
filt_obj.add_member(sep_obj)
# Populate filter product dictionary with input filename
obset_products[fprod]["files"].append(filename[1])
# Check to see whether an additional layer with a different plate scale should be generated
# Primarily for WFC3/IR fine vs coarse layers
if pscale == "coarse":
fprod_fine = FP_STR.format(filt_indx + 1)
filt_indx_inc = 2
# Generate 'fine' layer as well
prod_info_fine = prod_info.replace("coarse", "fine")
layer_fine = (layer[0], "fine") + layer[2:]
if fprod_fine not in obset_products:
obset_products[fprod_fine] = {
"info": prod_info_fine,
"files": [],
}
# Create a filter product object for this instrument/detector
# FilterProduct(prop_id, obset_id, instrument, detector,
# filename, filters, filetype, log_level)
filt_obj_fine = SkyCellProduct(
str(0),
str(0),
prod_list[1],
prod_list[2],
"empty_aperture", #aperture
prod_list[0],
layer_fine,
ftype,
log_level,
)
obset_products[fprod_fine]["files"].append(filename[1])
filt_obj_fine.add_member(sep_obj)
filt_indx += filt_indx_inc
# Add this filter object to the list for creating that layer BUT
# ONLY if there are new exposures being processed for this layer,
#
if filt_obj.new_to_layer > 0:
# Append filter object to the list of filter objects for this specific total product object
log1 = "Attach the sky cell layer object {}"
log2 = "to its associated total product object {}/{}."
log.debug(
" ".join([log1, log2]).format(
filt_obj.filters, filt_obj.instrument, filt_obj.detector
)
)
filt_obj.add_all_mvm_exposures_list(all_mvm_exposures)
if pscale == "coarse":
filt_obj_fine.add_all_mvm_exposures_list(all_mvm_exposures)
# For MVM processing there are no TotalProduct objects, and the TotalProduct list is a list
# comprised of the SkyCellProduct objects which are equivalent to the FilterProduct objects
# in SVM processing. The need here is to gather all the SkyCellProduct objects, whether they
# are "coarse" or "fine", into a list.
tdp_list.append(filt_obj)
if pscale == "coarse":
tdp_list.append(filt_obj_fine)
# Done... return dict and object product list
return obset_products, tdp_list
# -----------------------------------------------------------------------------
# Utility Functions
#
[docs]
def parse_obset_tree(det_tree, log_level):
"""Convert tree into products
Tree generated by ``create_row_info()`` will always have the following
levels:
* detector
* filters
* exposure
Each exposure will have an entry dict with keys 'info' and 'filename'.
Products created will be:
* total detection product per detector
* filter products per detector
* single exposure product
* grism/prism single exposure product [only if Grism/Prism exposures]
Grism/Prism images were added to the "doProcess" list after most of this
code was developed as they need to have the same Primary WCS as the direct
images from the same detector in the visit. These images are deliberately
not added to the output obset_products as they will not be drizzled. They
are added as individual GrismExposureProduct objects to the TotalProduct only.
The GrismExposureProduct list in the TotalProduct is separate and distinct
from the ExposureProduct list of direct images.
"""
log.setLevel(log_level)
# Initialize products dict
obset_products = {}
# For each level, define a product, starting with the detector used...
prev_det_indx = 0
det_indx = 0
filt_indx = 0
sep_indx = 0
tdp_list = []
# Setup products for each detector used
for filt_tree in det_tree.values():
totprod = TDP_STR.format(det_indx)
obset_products[totprod] = {"info": "", "files": []}
det_indx += 1
# Find all filters used...
for filter_files in filt_tree.values():
# Use this to create and populate filter product dictionary entry
fprod = FP_STR.format(filt_indx)
obset_products[fprod] = {"info": "", "files": []}
filt_indx += 1
# Populate single exposure dictionary entry now as well
filter_members, filetype = select_common_filetype(filter_files)
for filename, is_member in zip(filter_files, filter_members):
if det_indx != prev_det_indx:
prev_det_indx = det_indx
if not is_member:
# This logic should get the opposite filetype from the
# value returned for the entire filter (at least that is the plan)
exp_filetype = "drz" if filetype == "drc" else "drc"
else:
exp_filetype = filetype
# Generate the full product dictionary information string:
# proposal_id, obset_id, instrument, detector, ipppssoot, filter, and filetype
# filename = ('11515 01 WFC3 UVIS IACS01T9Q F200LP', 'iacs01t9q_flt.fits')
exp_prod_info = (filename[0] + " " + exp_filetype).lower()
# Set up the single exposure product dictionary
sep = SEP_STR.format(sep_indx)
obset_products[sep] = {"info": exp_prod_info, "files": [filename[1]]}
# Increment single exposure index
sep_indx += 1
# Create a single exposure product object
# The GrismExposureProduct is only an attribute of the TotalProduct.
prod_list = exp_prod_info.split(" ")
# prod_list is 0: proposal_id, 1:observation_id, 2:instrument, 3:detector,
# 4:aperture_from_poller, 5:filename, 6:filters, 7:filetype
# The prod_list[6] is the filter - use this information to distinguish between
# a direct exposure for drizzling (ExposureProduct) and an exposure
# (GrismExposureProduct) which is carried along (Grism/Prism) to make analysis
# easier for the user by having the same WCS in both the direct and
# Grism/Prism products.
# Determine if this image is a Grism/Prism or a nominal direct exposure
is_grism = False
if (
prod_list[6].lower().find("g") != -1
or prod_list[6].lower().find("pr") != -1
):
is_grism = True
filt_indx -= 1
grism_sep_obj = GrismExposureProduct(
prod_list[0], # prop_id
prod_list[1], # obset_id
prod_list[2], # instrument
prod_list[3], # detector
prod_list[4], # aperture_from_poller
filename[1], # filename
prod_list[6], # filters
prod_list[7], # filetype
log_level, # log_level
)
else:
sep_obj = ExposureProduct(
prod_list[0],
prod_list[1],
prod_list[2],
prod_list[3],
prod_list[4],
filename[1],
prod_list[6],
prod_list[7],
log_level,
)
# Now that we have defined the ExposureProduct for this input exposure,
# do not include it any total or filter product.
if not is_member:
continue
prod_info = (filename[0] + " " + filetype).lower()
# Redefine for the filter and total product definitions
# This will insure the correct product type (DRZ vs DRC) gets passed along
prod_list = prod_info.split(" ")
# Set up the filter product dictionary and create a filter product object
# Initialize `info` key for this filter product dictionary
if not is_grism:
if not obset_products[fprod]["info"]:
obset_products[fprod]["info"] = prod_info
# Create a filter product object for this instrument/detector
filt_obj = FilterProduct(
prod_list[0],
prod_list[1],
prod_list[2],
prod_list[3],
prod_list[4],
prod_list[5],
prod_list[6],
prod_list[7],
log_level,
)
# Append exposure object to the list of exposure objects for this specific filter product object
filt_obj.add_member(sep_obj)
# Populate filter product dictionary with input filename
obset_products[fprod]["files"].append(filename[1])
# Set up the total detection product dictionary and create a total detection product object
# Initialize `info` key for total detection product
if not obset_products[totprod]["info"]:
obset_products[totprod]["info"] = prod_info
# Create a total detection product object for this instrument/detector
tdp_obj = TotalProduct(
prod_list[0],
prod_list[1],
prod_list[2],
prod_list[3],
prod_list[4],
prod_list[5],
prod_list[7],
log_level,
)
if not is_grism:
# Append exposure object to the list of exposure objects for this specific total detection product
tdp_obj.add_member(sep_obj)
# Populate total detection product dictionary with input filename
obset_products[totprod]["files"].append(filename[1])
# Append filter object to the list of filter objects for this specific total product object
log.debug(
"Attach the filter object {} to its associated total detection product object {}/{}.".format(
filt_obj.filters, tdp_obj.instrument, tdp_obj.detector
)
)
# Identify what exposures should use single-image CR identification algorithm
is_ccd = not (
filt_obj.instrument.lower() == "wfc3"
and filt_obj.detector.lower() == "ir"
)
if is_ccd and len(filt_obj.edp_list) == 1:
for e in filt_obj.edp_list:
e.crclean = True
elif is_grism:
tdp_obj.add_grism_member(grism_sep_obj)
if not is_grism:
tdp_obj.add_product(filt_obj)
# Add the total product object to the list of TotalProducts
tdp_list.append(tdp_obj)
# If the total product object (TDP) has Grism/Prism members, it MUST have direct exposure
# members too, or the TDP is not valid. If the TDP is not valid, it is still necessary
# to generate a log file for proper tracking of the processing, so just delete
# the disk SVM FLT/FLC files for the Grism/Prism images. Allow the the TDP to be passed
# back to run_hap_processing (hapsequencer.py) where the validity of the product can be handled.
for tdp in tdp_list:
if tdp.grism_edp_list and not tdp.edp_list:
for item in tdp.grism_edp_list:
try:
os.remove(item.full_filename)
except OSError:
pass
# Done... return dict and object product list
return obset_products, tdp_list
# ------------------------------------------------------------------------------
def select_common_filetype(filter_files):
"""Select common filter type and set of filter_files for input list of filenames.
Parameters
-----------
filter_files : list
List of tuples describing input files for the filter product
Returns
-------
filter_members : list
Mask indicating which input files are members correspond to the output product type
prod_type : str
String corresponding to the suffix (DRZ vs DRC) to be used for the output product based
on the common filter type (FLT vs FLC)
"""
filetypes = np.array([f[1][:-5].split("_")[-1] for f in filter_files])
# Check whether or not all input files for this filter product have the same filetype (suffix)
filetypes_flt = filetypes == "flt"
filetypes_flc = filetypes == "flc"
num_flc = len(np.where(filetypes_flc)[0])
# determine whether we have FLC images in set or not
# and set the filter_type accordingly
filter_type = "flc" if num_flc > 0 else "flt"
filter_members = filetypes_flc if filter_type == "flc" else filetypes_flt
# Return the updated/cleaned list of input files,
# along with the suffix for the files in the input list for this filter product
prod_type = "drc" if filter_type == "flc" else "drz"
return filter_members, prod_type
# ------------------------------------------------------------------------------
def define_exp_layers(obset_table, method="hard", exp_limit=None):
"""Determine what exposures will be grouped into the same layer of a sky cell"""
# Add 'exp_layer' column to table
if method not in SUPPORTED_EXP_METHODS:
raise ValueError(
"Please use a supported method: {}".format(SUPPORTED_EXP_METHODS)
)
if method == "kmeans":
# Use pre-defined limits on exposure times for clusters
exptime_range = obset_table["exptime"].max() / obset_table["exptime"].min()
if exp_limit is not None and exptime_range > exp_limit:
kmeans = KMeans(n_clusters=3, random_state=0).fit(obset_table["exptime"])
# Sort and identify the clusters in increasing duration
centers = kmeans.cluster_centers_.reshape(1, -1)[0].argsort()
exp_layer = [centers[l] for l in kmeans.labels_]
else:
exp_layer = [None] * len(obset_table)
if method == "all":
exp_layer = [None] * len(obset_table)
else:
# Use pre-defined limits for selecting layer members
# Subtraction by 1 puts the range from 0-2 to be consistent with KMeans
exp_layer = np.digitize(obset_table["exptime"], EXP_LIMITS) - 1
# Add column to the table as labelled values ('short', 'med', 'long', 'all')
obset_table["exp_layer"] = [EXP_LABELS[e].upper() for e in exp_layer]
# ------------------------------------------------------------------------------
def determine_filter_name(raw_filter):
"""
Generate the final filter name to be used for an observation.
Parameters
----------
raw_filter : string
filters component one exposure from an input observation visit
Returns
-------
filter_name : string
final filter name
If the raw_filter is actually a combination of two filter names, as
can be true for instruments with two filter wheels, then generate
a new filter string according the following rules:
- If one filter name is 'clear*', then use the other filter name.
- If both filter names are 'clear*', then use 'clear'.
- If there are two filters in use, then use 'filter1-filter2'.
- If one filter is a polarizer ('pol*'), then always put the polarizer
name second (e.g., 'f606w-pol60').
- If one filter is 'n/a' (as is the case for SBC), then use the other filter
name. NOTE: At this time (February 2020) the filter names for the SBC
detector of ACS have malformed names (e.g., F140LP;N/A;F140LP) where the
final token delimited by the ";" should not be present. Remove the final
delimiter and entry. Unlike the other ACS detectors, SBC only uses a
single filter wheel.
- NOTE: There should always be at least one filter name provided to
this routine or this input is invalid.
"""
raw_filter = raw_filter.lower()
# There might be multiple filters, so split the filter names into a list
# and only retain the first two entries. SBC has a bogus third entry.
filter_list = raw_filter.split(";")[0:2]
output_filter_list = []
for filt in filter_list:
if not any(x in filt for x in ["clear", "n/a"]):
output_filter_list.append(filt)
if not output_filter_list:
output_filter_list = ["clear"]
else:
if output_filter_list[0].startswith("pol"):
output_filter_list.reverse()
delimiter = "-"
filter_name = delimiter.join(output_filter_list).rstrip(delimiter)
return filter_name
# ------------------------------------------------------------------------------
[docs]
def build_poller_table(
input: str,
log_level,
all_mvm_exposures=[],
poller_type="svm",
include_small=True,
only_cte=False,
):
"""Create a poller file from dataset names for either SMV or MVM processing. Information is either gathered
from the poller file or by using the filename to open the file and pulling information from the header keywords.
The code treats WFPC2 differently, by uses both approaches. For WFPC2, We use simple poller files with a second column
that includes the aperture. The code gathers the rest of the relevant informaiton from the header keywords.
Parameters
-----------
input : str, list
Filename with list of dataset names, or just a Python list of dataset names, provided by the user.
log_level : int
The desired level of verboseness in the log statements displayed on the screen and written to the .
log file.
all_mvm_exposures : str, list
List of all the MVM FLT/FLC exposure filenames in the list. This will include the filenames
which are dropped because they have been processed previously.
poller_type : str, optional
The type of poller file being processed. Either 'svm' for single visit mosaic, or 'mvm' for
multi-visit mosaic. Unless explicitly specified, the default value is 'svm'.
Returns
--------
poller_table : Table
Astropy table object with the same columns as a poller file.
"""
log.setLevel(log_level)
is_poller_file = False
# Check the input file is not empty
if not isinstance(input, list) and not os.path.getsize(input):
log.error(
"Input poller manifest file, {}, is empty - processing is exiting.".format(
input
)
)
sys.exit(0)
if poller_type == "mvm":
poller_colnames = MVM_POLLER_COLNAMES
poller_dtype = MVM_POLLER_DTYPE
else:
poller_colnames = POLLER_COLNAMES
poller_dtype = POLLER_DTYPE
datasets = []
# limit column string types to minimum length formats e.g. str8, str11, etc.
obs_converters = {"col4": [ascii.convert_numpy(np.str_)]}
if isinstance(input, str):
input_table = ascii.read(input, format="no_header", converters=obs_converters)
if len(input_table.columns) == 1:
input_table.columns[0].name = "filename"
input_table["aperture"] = "empty_aperture"
poller_dtype += ("str",)
is_poller_file = False # gets important keywords from file headers instead of poller file
# unique logic to collect WFPC2 aperture data from poller file
elif len(input_table.columns) == 2:
input_table.columns[0].name = "filename"
input_table.columns[1].name = "aperture"
# add dtype for aperture column
poller_dtype += ("str",)
is_poller_file = False
elif len(input_table.columns) == len(poller_colnames):
# We were provided a poller file
# Now assign column names to table
for i, colname in enumerate(poller_colnames):
input_table.columns[i].name = colname
# Convert to a string column, instead of int64
input_table["obset_id"] = input_table["obset_id"].astype(np.str_)
# Convert string column into a Bool column
# The input poller file reports True if it has been reprocessed.
# This code interprets that as False since it is NOT new, so the code
# inverts the meaning from the pipeline poller file.
if poller_type == "mvm":
# Translate new format back to old format ("NEW" -> 0 and "OLD" -> 1)
poller_table_mapping = {"NEW": 0, "OLD": 1}
reverse_table_mapping = {"0": "NEW", "1": "OLD"}
rows_to_drop = []
for tbl_ctr in range(0, len(input_table)):
all_mvm_exposures.append(input_table[tbl_ctr]["filename"])
if input_table[tbl_ctr]["skycell_new"].upper() in ["OLD", "NEW"]:
if input_table[tbl_ctr]["skycell_new"].upper() == "OLD":
rows_to_drop.append(tbl_ctr)
input_table[tbl_ctr]["skycell_new"] = poller_table_mapping[
input_table[tbl_ctr]["skycell_new"].upper()
]
elif input_table[tbl_ctr]["skycell_new"] in ["0", "1"]:
err_msg = "'{}' is an invalid skycell_new poller file value. (Legal values: 'NEW' or 'OLD'). Please use '{}' instead of '{}'. Exiting... ".format(
input_table[tbl_ctr]["skycell_new"],
reverse_table_mapping[input_table[tbl_ctr]["skycell_new"]],
input_table[tbl_ctr]["skycell_new"],
)
log.error(err_msg)
raise Exception(err_msg)
else:
err_msg = "'{}' is an invalid skycell_new poller file value. (Legal values: 'NEW' or 'OLD'). Exiting... ".format(
input_table[tbl_ctr]["skycell_new"]
)
log.error(err_msg)
raise Exception(err_msg)
# Apply logic for ignoring additional data, based on which environment variables are defined
# when defining what output SkyCell layers to generate from a poller file
# Need to ignore non-CTE-corrected UVIS data
# Note: ignoring ACS/HRC and ACS/SBC from input list for MVM processing is done in analyze.analyze_wrapper().
cte_flag = input_table[tbl_ctr]["filename"][-6] == "c"
# for WFC3 data, if UVIS and not CTE-corrected, flag for removal from processing
wf3_cte = (
input_table[tbl_ctr]["detector"].upper() == "UVIS"
and not cte_flag
)
# for ACS data, if WFC and not CTE-corrected, flag for removal from processing
acs_cte = (
input_table[tbl_ctr]["detector"].upper() == "WFC"
and not cte_flag
)
# if only CTE data is requested, and either acs_cte or wf3_cte is True, then remove from processing
if only_cte and (wf3_cte or acs_cte):
rows_to_drop.append(tbl_ctr)
# Omit input images listed as "OLD" in the poller file from processing
if len(rows_to_drop) == len(input_table):
err_msg = "All images have already been MVM processed. No new MVM processing is needed. Exiting..."
log.error(err_msg)
sys.exit(analyze.Ret_code.NO_VIABLE_DATA.value)
elif len(rows_to_drop) == 0:
log.info(
"None of the input images have previously been MVM processed. Proceeding with MVM processing of all input images... "
)
else:
log.info(
"The following {} input image(s) have already been MVM processed and will be omitted from MVM processing:".format(
len(rows_to_drop)
)
)
for tbl_idx in rows_to_drop:
log.info(" {}".format(input_table[tbl_idx]["filename"]))
input_table.remove_rows(rows_to_drop)
input_table["skycell_new"] = [
int(not BOOL_STR_DICT[str(val).upper()])
for val in input_table["skycell_new"]
]
is_poller_file = True
# Check that each file listed in the poller file exists in the current working directory. If a
# file is missing, copy it over from the path specified in the poller file. Failing that, raise
# an exception and exit.
for table_line in input_table:
if os.path.exists(table_line["filename"]):
log.info(
f"Input image {table_line['filename']} found in current working directory."
)
elif os.path.exists(table_line["pathname"]):
log.info(
f"Input image {table_line['filename']} not found in current working directory. However, it was found in the path specified in the poller file."
)
shutil.copy(table_line["pathname"], os.getcwd())
log.info(
f"Input image {table_line['pathname']} copied to current working directory."
)
else:
log.error(
f"Input image {table_line['filename']} not found in current working directory."
)
log.error(
f"Archived input image {table_line['pathname']} not found."
)
err_msg = f"Input image {table_line['filename']} missing from current working directory and from the path specified in the poller file. Exiting... "
log.error(err_msg)
raise Exception(err_msg)
elif (poller_type == "mvm") and (
len(input_table.columns) != len(poller_colnames)
):
log.error(
f"MVMs should use full poller files with {len(poller_colnames)} columns."
)
err_msg = f"Full poller files should have {len(poller_colnames)} columns. Exiting... "
log.error(err_msg)
raise Exception(err_msg)
# input is string with unexpected number of columns
else:
log.error(
f"Poller file has an unexpected number of columns, code expects either 1, 2, or {len(poller_colnames)} but received: {len(input_table.columns)}"
)
raise ValueError
# Since a poller file was the input, it is assumed all the input
# data is in the locale directory so just collect the filenames.
# datasets = input_table[input_table.colnames[0]].tolist()
filenames = list(input_table.columns[0])
# If input is a list of filenames
elif isinstance(input, list):
filenames = input
input_table = None
else:
id = "[poller_utils.build_poller_table] "
log.error(
"{}: Input {} not supported as input for processing.".format(id, input)
)
raise ValueError
# At this point, we have a poller file or a list of filenames. If the latter, then any individual
# filename can be a singleton or an association name. We need to get the full list of actual
# filenames from the association name.
if not is_poller_file:
for filename in filenames:
# Look for dataset in local directory.
if "asn" in filename or not os.path.exists(filename):
# This retrieval will NOT overwrite any ASN members already on local disk
# Return value will still be list of all members
files = aqutils.retrieve_observation(
[filename[:9]], suffix=["FLC"], clobber=False
)
if len(files) == 0:
log.error("Filename {} not found in archive!!".format(filename))
log.error("Please provide ASN filename instead!")
raise ValueError
else:
files = [filename]
datasets += files
all_mvm_exposures = datasets
else:
datasets = filenames
# Each image, whether from a poller file or from an input list needs to be
# analyzed to ensure it is viable for drizzle processing. If the image is not
# viable, it should not be included in the output "poller" table.
# NOTE: The "all_mvm_exposures" variable does not need to be checked for "usable" datasets
# as it is used to create a WCS/footprint to accommodate *all* exposures in an MVM visit.
usable_datasets, usable_dataset_index, return_code = analyze.analyze_wrapper(
datasets, use_sbchrc=include_small, type=poller_type
)
if not usable_datasets:
log.warning(
"No usable images in poller file or input list for drizzling. The processing of this data is ending."
)
sys.exit(return_code)
else:
log.info(
"There are {} usable images identified in the poller file for processing.".format(
len(usable_datasets)
)
)
cols = OrderedDict()
for cname in poller_colnames:
cols[cname] = []
cols["filename"] = usable_datasets
if input_table:
if "aperture" in input_table.colnames:
aperture_all_datasets = input_table["aperture"].tolist()
aperture_of_useable_dataset = np.array(aperture_all_datasets)[usable_dataset_index]
cols["aperture"] = aperture_of_useable_dataset
else:
cols["aperture"] = ["empty_aperture"] * len(usable_datasets)
add_col = Column(
["empty_aperture"] * len(input_table), name="aperture", dtype="str"
)
input_table.add_column(add_col, index=7)
poller_dtype += ("str",)
else:
raise ValueError("Input table is empty. Exiting...")
# If MVM processing and a poller file is the input, this implies there is
# only one skycell of interest for all the listed filenames in the poller
# file. Establish the WCS, but no need for discovery of overlapping skycells
# as would be the case for an input list of filenames.
if poller_type == "mvm" and is_poller_file:
pipeline_skycell_id = input_table[0]["skycell_id"]
scells = {}
skycell_obj = cell_utils.SkyCell.from_name(pipeline_skycell_id)
skycell_obj.members = filenames
scells[pipeline_skycell_id] = skycell_obj
scell_files = cell_utils.interpret_scells(scells)
log.debug("MVM and poller file. scell_files: {}".format(scell_files))
# If processing a list of files, evaluate each input dataset for the information needed
# for the poller file
if not is_poller_file:
for d in usable_datasets:
with fits.open(d) as dhdu:
hdr = dhdu[0].header
cols["program_id"].append(d[1:4].upper())
cols["obset_id"].append(str(d[4:6]))
cols["proposal_id"].append(hdr["proposid"])
cols["exptime"].append(hdr["exptime"])
cols["detector"].append(hdr["detector"])
cols["pathname"].append(os.path.abspath(d))
# process filter names
if d[0] == "j": # ACS data
filters = processing_utils.get_acs_filters(dhdu, all=True)
elif d[0] == "i":
filters = hdr["filter"]
elif d[0] == "u":
filters = processing_utils.get_wfpc2_filters(dhdu, all=True)
cols["filters"].append(filters)
if poller_type == "mvm":
# interpret_scells returns:
# {'filename1':{'<sky cell id1>': SkyCell1,
# '<sky cell id2>':SkyCell2,
# 'id': "<sky cell id1>;<sky cell id2>"},
# 'filename2': ...}
# This preserves 1 entry per filename, while providing info on
# multiple SkyCell's for each filename as appropriate.
#
cols["skycell_id"] = [
scell_files[fname]["id"] for fname in cols["filename"]
]
cols["skycell_new"] = [1] * len(cols["filename"])
#
# Build output table
#
poller_data = [col for col in cols.values()]
poller_names = [colname for colname in cols]
poller_table = Table(data=poller_data, names=poller_names, dtype=poller_dtype)
# The input was a poller file, so just keep the viable data rows for output
else:
good_rows = []
for d in usable_datasets:
for i, old_row in enumerate(input_table):
if d == input_table["filename"][i]:
good_rows.append(old_row)
# This table contains the pipeline specified skycell_id for each row
# which should be the same value in every row
poller_table = Table(
rows=good_rows, names=input_table.colnames, dtype=poller_dtype
)
#
# If 'mvm' poller file, expand any multiple skycell entries into separate rows
#
if poller_type == "mvm":
# A new row will need to be added for each additional SkyCell that the
# file overlaps...
#
poller_table["skycell_obj"] = [None] * len(poller_table)
#
# Make a copy of the original poller_table
#
new_poller_table = poller_table[poller_table["filename"] != None]
for name in scell_files:
for scell_id in scell_files[name]:
if scell_id != "id":
scell_obj = scell_files[name][scell_id]
for indx, row in enumerate(poller_table):
if row["filename"] != name:
continue
if new_poller_table[indx]["skycell_obj"] is None:
new_poller_table[indx]["skycell_obj"] = scell_obj
new_poller_table[indx]["skycell_id"] = scell_id
else:
poller_rows = poller_table[poller_table["filename"] == name]
sobj0 = poller_rows["skycell_obj"][0]
# Select only 1 row regardless of how many we have already
# added for this filename (in case file overlapped more than
# 2 sky cells at once).
poller_row = poller_rows[
poller_rows["skycell_obj"] == sobj0
]
# make copy of row for this filename
# assign updated values to skycell columns
poller_row["skycell_id"] = scell_id
poller_row["skycell_obj"] = scell_obj
# append new row to table
new_poller_table.add_row(poller_row[0])
# All the work has been done to setup the poller table which accommodates
# both a pipeline supplied poller file, as well as a list of filenames. The
# table contains all sky cells that overlap with the input files. However, the
# pipeline supplied poller file originally specified the single skycell to be
# processed under "pipeline" conditions. If this invocation is by a poller file,
# then trim the table to contain only the rows which match the sky cell specified
# in the poller file.
pipeline_skycell_id = poller_table[0]["skycell_id"]
new_poller_table = new_poller_table[
new_poller_table["skycell_id"] == pipeline_skycell_id
]
if not new_poller_table:
log.error(
"No sky cell found which matches the sky cell specified in the poller file {}.".format(
pipeline_skycell_id
)
)
sys.exit(0)
poller_table = new_poller_table
return poller_table
# ------------------------------------------------------------------------------
def sort_poller_table(obset_table):
"""Sort the input table by photflam and exposure time.
The alignment of data within a single visit is first done in a relative way,
successively adding images to the stack within the visit. The next step is
to achieve absolute alignment of the stack of images to the GAIA catalog.
One scheme to obtain the largest number of quality (unsaturated) fiducials/sources,
is to sort the images first according to their inverse sensitivity (photflam)
and then according to their exposure time.
The obset_table has the following columns:
filename, proposal_id, program_id, obset_id, exptime, filters, detector, and pathname
Parameters
----------
obset_table: Astropy table
Astropy table object with columns which map to the original poller file
Returns
-------
updated_obset_table: Astropy table
The sorted version of the input Astropy table
"""
# Create a copy of the input table and add the photflam column with a filler value
expanded_obset_table = Table(obset_table)
expanded_obset_table["flam"] = -999999.0
for row in expanded_obset_table:
input_file = row[expanded_obset_table.colnames[0]]
# Open the specified FITS file
h0 = getheader(input_file, 0)
# Need to get the instrument and detector keywords in order to determine
# where to look for the various necessary keywords (i.e., primary or
# extension)
instrument = h0["instrume"].upper()
detector = h0["detector"].upper()
# HST IMAGE
# photflam: inverse sensitivity, ergs/s-/cm2-/A-1 for 1 electron/s
# PHOTFLAM keyword is found in each science extension for the currently
# supported instruments (as of 01 Jan 2020), except for WFC3/IR where
# PHOTFLAM is in the Primary.
#
# Although the PHOTFLAM keyword is science extension-dependent,
# the differences in values is so small as to not be relevant in
# this particular context.
if instrument == "WFC3" and detector == "IR":
photflam = h0["photflam"]
else:
h1 = getheader(input_file, "sci", 1)
photflam = h1["photflam"]
row["flam"] = photflam
# Determine the rank order the data with a primary key of photflam and a secondary key
# of exposure time (in seconds). The primary and secondary keys both need
# to be sorted in descending order. Use the rank to sort
# the original input table for output.
# Unfortunately, there are cases where the default works better; namely,
# fields with few point sources which are bright that are saturated in the
# longer exposure time images (like j8cw03 and j8cw53).
# rank = np.lexsort((-expanded_obset_table['flam'], -expanded_obset_table['exptime']))
# Original implementation:
# rank = np.lexsort((expanded_obset_table['exptime'], -expanded_obset_table['flam']))
rank = np.lexsort((-expanded_obset_table["flam"], expanded_obset_table["exptime"]))
updated_obset_table = obset_table[rank]
return updated_obset_table
# ------------------------------------------------------------------------------
def add_primary_fits_header_as_attr(hap_obj, log_level=logutil.logging.NOTSET):
"""create object attribute containing image primary header info
Parameters
----------
hap_obj : drizzlepac.haputils.Product.TotalProduct, drizzlepac.haputils.Product.FilterProduct, or
drizzlepac.haputils.Product.ExposureProduct, depending on input
object to update
log_level : int, optional.
The desired level of verboseness in the log statements displayed on the screen and written to the
.log file.
Returns
-------
hap_obj : drizzlepac.haputils.Product.TotalProduct, drizzlepac.haputils.Product.FilterProduct, or
drizzlepac.haputils.Product.ExposureProduct, depending on input
updated version of input object
"""
if os.path.exists(hap_obj.drizzle_filename):
file_name = hap_obj.drizzle_filename
else:
file_name = hap_obj.full_filename
hap_obj.primary_header = fits.getheader(file_name)
log.info("added primary header info from file {} to {}".format(file_name, hap_obj))
return hap_obj