Source code for drizzlepac.haputils.poller_utils

"""Utilities to interpret the pipeline poller obset information and generate product filenames

The function, interpret_obset_input, parses the file generated by the pipeline
poller, and produces a tree listing of the output products.  The function,
parse_obset_tree, converts the tree into product catagories.

"""
from collections import OrderedDict
import os
import shutil
import sys

import numpy as np
import copy

from sklearn.cluster import KMeans

from stsci.tools import logutil

from astropy.io import fits
from astropy.io import ascii
from astropy.io.fits import getheader
from astropy.table import Table, Column
from drizzlepac import util
from drizzlepac.haputils.product import (
    ExposureProduct,
    FilterProduct,
    TotalProduct,
    GrismExposureProduct,
)
from drizzlepac.haputils.product import SkyCellProduct, SkyCellExposure
from . import analyze
from . import astroquery_utils as aqutils
from . import processing_utils
from . import cell_utils

# Define information/formatted strings to be included in output dict
SEP_STR = "single exposure product {:02d}"
FP_STR = "filter product {:02d}"
TDP_STR = "total detection product {:02d}"

# Define the mapping between the first character of the filename and the associated instrument
INSTRUMENT_DICT = {
    "i": "WFC3",
    "j": "ACS",
    "o": "STIS",
    "u": "WFPC2",
    "x": "FOC",
    "w": "WFPC",
}
POLLER_COLNAMES = [
    "filename",
    "proposal_id",
    "program_id",
    "obset_id",
    "exptime",
    "filters",
    "detector",
    "pathname",
]
POLLER_DTYPE = ("str", "int", "str", "str", "float", "object", "str", "str")

MVM_POLLER_COLNAMES = [
    "filename",
    "proposal_id",
    "program_id",
    "obset_id",
    "exptime",
    "filters",
    "detector",
    "skycell_id",
    "skycell_new",
    "pathname",
]
MVM_POLLER_DTYPE = (
    "str",
    "int",
    "str",
    "str",
    "float",
    "object",
    "str",
    "str",
    "str",
    "str",
)

BOOL_STR_DICT = {
    "TRUE": True,
    "FALSE": False,
    "T": True,
    "F": False,
    "1": True,
    "0": False,
}

EXP_LABELS = {2: "long", 1: "med", 0: "short", None: "all"}
EXP_LIMITS = [0, 15, 120]
SUPPORTED_EXP_METHODS = {"kmeans", "hard", "all"}

__taskname__ = "poller_utils"

MSG_DATEFMT = "%Y%j%H%M%S"
SPLUNK_MSG_FORMAT = "%(asctime)s %(levelname)s src=%(name)s- %(message)s"
log = logutil.create_logger(
    __name__,
    level=logutil.logging.NOTSET,
    stream=sys.stdout,
    format=SPLUNK_MSG_FORMAT,
    datefmt=MSG_DATEFMT,
)


# -----------------------------------------------------------------------------
# Single Visit Processing Functions
#
[docs] def interpret_obset_input(results: str, log_level): """ parses the file generated by the pipeline poller, and produces a tree listing of the output products. Parameters ----------- results : str or list Input poller file name or Python list of dataset names to be processed as a single visit. Dataset names have to be either the filename of a singleton (non-associated exposure) or the name of an ASN file (e.g., jabc01010_asn.fits). log_level : int The desired level of verboseness in the log statements displayed on the screen and written to the .log file. Returns ------- obset_dict : dict Dictionary of obset objects tdp_list : list list of total detection products Notes ------- Interpret the database query for a given obset to prepare the returned values for use in generating the names of all the expected output products. Input will have formated rows of one of the following three options 1) Full poller file .. code-block:: ib4606c5q_flc.fits,11665,B46,06,1.0,F555W,UVIS,/foo/bar/ib4606c5q_flc.fits 2) Simpler poller file (list); other info taken from file header keywords .. code-block:: ib4606c5q_flc.fits 3) For updating the WFPC2 SVM aperture keyword using the poller file; it is important that there are no spaces within the poller aperture keyword(s) .. code-block:: ib4606c5q_flc.fits, PC1-FIX;F160BN15 Full poller files contain filename, proposal_id, program_id, obset_id, exptime, filters, detector, pathname. The output dict will have format (as needed by further code for creating the product filenames) of .. code-block:: obs_info_dict["single exposure product 00": {"info": '11665 06 wfc3 uvis empty_aperture ib4606c5q f555w drc', "files": ['ib4606c5q_flc.fits']} . . . obs_info_dict["single exposure product 08": {"info": '11665 06 wfc3 ir empty_aperture ib4606clq f110w drz', "files": ['ib4606clq_flt.fits']} obs_info_dict["filter product 00": {"info": '11665 06 wfc3 uvis empty_aperture ib4606c5q f555w drc', "files": ['ib4606c5q_flc.fits', 'ib4606c6q_flc.fits']}, . . . obs_info_dict["filter product 01": {"info": '11665 06 wfc3 ir empty_aperture ib4606cmq f160w drz', "files": ['ib4606cmq_flt.fits', 'ib4606crq_flt.fits']}, obs_info_dict["total detection product 00": {"info": '11665 06 wfc3 uvis empty_aperture ib4606c5q f555w drc', "files": ['ib4606c5q_flc.fits', 'ib4606c6q_flc.fits']} . . . obs_info_dict["total detection product 01": {"info": '11665 06 wfc3 ir empty_aperture ib4606cmq f160w drz', "files": ['ib4606cmq_flt.fits', 'ib4606crq_flt.fits']} The aperture keyword, which has a default value of 'empty_aperture' is filled in the case of WFPC2 observations, and the use of the two-column format. """ # set logging level to user-specified level log.setLevel(log_level) log.debug("Interpret the poller file for the observation set.") obset_table = build_poller_table(results, log_level) # Add INSTRUMENT column instr = INSTRUMENT_DICT[obset_table["filename"][0][0]] # convert input to an Astropy Table for parsing obset_table.add_column(Column([instr] * len(obset_table)), name="instrument") # Sort the rows of the table in an effort to optimize the number of quality sources found in the initial images obset_table = sort_poller_table(obset_table) log.debug("Sorted input:") log.debug(obset_table) log.debug("\n\n") # parse Table into a tree-like dict log.debug("Build the observation set tree.") obset_tree = build_obset_tree(obset_table) # Now create the output product objects log.debug( "Parse the observation set tree and create the exposure, filter, and total detection objects." ) obset_dict, tdp_list = parse_obset_tree(obset_tree, log_level) # This little bit of code adds an attribute to single exposure objects that is True # if a given filter only contains one input (e.g. n_exp = 1) for tot_obj in tdp_list: for filt_obj in tot_obj.fdp_list: if len(filt_obj.edp_list) == 1: is_singleton = True else: is_singleton = False for edp_obj in filt_obj.edp_list: edp_obj.is_singleton = is_singleton return obset_dict, tdp_list
# Translate the database query on an obset into actionable lists of filenames
[docs] def build_obset_tree(obset_table): """Convert obset table into a tree listing all products to be created.""" # Each product will consist of the appropriate string as the key and # a dict of 'info' and 'files' information # Start interpreting the obset table obset_tree = {} for row in obset_table: # Get some basic information from the first row - no need to check # for multiple instruments as data from different instruments will # not be combined. det = row["detector"] orig_filt = row["filters"] # Potentially need to manipulate the 'filters' string for instruments # with two filter wheels filt = determine_filter_name(orig_filt) row["filters"] = filt row_info, filename = create_row_info(row) # Initial population of the obset tree for this detector if det not in obset_tree: obset_tree[det] = {} obset_tree[det][filt] = [(row_info, filename)] else: det_node = obset_tree[det] if filt not in det_node: det_node[filt] = [(row_info, filename)] else: det_node[filt].append((row_info, filename)) return obset_tree
def create_row_info(row): """Build info string for a row from the obset table""" info_list = [ str(row["proposal_id"]), "{}".format(row["obset_id"]), row["instrument"], row["detector"], row["aperture"], row["filename"][: row["filename"].find("_")], row["filters"], ] return " ".join(map(str.upper, info_list)), row["filename"] # ----------------------------------------------------------------------------- # Multi-Visit Processing Functions #
[docs] def interpret_mvm_input( results, log_level, layer_method="all", exp_limit=2.0, user_table=None, include_small=True, only_cte=False, ): """ Parameters ----------- results : str or list Input poller file name or Python list of dataset names to be processed for a multi-visit. Dataset names have to be either the filename of a singleton (non-associated exposure) or the name of an ASN file (e.g., jabc01010_asn.fits). log_level : int The desired level of verboseness in the log statements displayed on the screen and written to the .log file. exp_limit : float, optional This specifies the ratio between long and short exposures that should be used to define each exposure time bin (short, med, long). If None, all exposure times will be combined into a single bin (all). Splitting of the bins gets computed using Kmeans clustering for 3 output bins. layer_method : str Specify what type of interpretation should be done for the definition of each layer. Options include: 'all', 'hard', 'kmeans'. The value 'all' simply puts all exposures regardless of exposure time. The value of 'hard' relies on pre-defined limits for 'short', 'med', and 'long' exposure times. The value of 'kmeans' relies on using 'kmeans' analysis for defining sub-layers based on exposure time. user_table : dict, optional User-supplied table generated using `~build_poller_table`. This table may have been edited to meet the unique needs of the user for this custom processing run. include_small : bool, optional Specify whether or not to create products from ACS/HRC and ACS/SBC data. only_cte : bool, optional Specify whether or not to create products only using CTE-corrected data. Notes ----- Interpret the contents of the MVM poller file for the values to use in generating the names of all the expected output products. Input will have format of:: ib4606c5q_flc.fits,11665,B46,06,1.0,F555W,UVIS,skycell-p2381x05y09,NEW,/ifs/archive/ops/hst/public/ib46/ib4606c5q/ib4606c5q_flc.fits which are:: filename, proposal_id, program_id, obset_id, exptime, filters, detector, skycell-p<PPPP>x<XX>y<YY>, [OLD|NEW], pathname The SkyCell ID will be included in this input information to allow for grouping of exposures into the same SkyCell layer based on filter, exptime, and year. Output dict will have only have definitions for each defined sky cell layer to be either created or updated based on the input exposures being processed. There will be a layer defined for each unique combination of filter/exp class/year. An example would be:: obs_info_dict["layer 00": {"info": '11665 06 wfc3 uvis f555w short 2011 drc', "files": ['ib4606c5q_flc.fits', 'ib4606cgq_flc.fits']} obs_info_dict["layer 01": {"info": '11665 06 wfc3 uvis f555w med 2011 drc', "files": ['ib4606c6q_flc.fits', 'ib4606cdq_flc.fits']} obs_info_dict["layer 02": {"info": '11665 06 wfc3 uvis f555w long 2011 drc', "files": ['ib4606c9q_flc.fits', 'ib4606ceq_flc.fits']} .. caution:: The input ("results") parameter was originally designed to be able to accept a Python list of pipeline product dataset names (e.g., ib4606cdq_flc.fits) for MVM proessing. However, this was later modified. MVM processing needs the SVM FLT/FLC files as input. """ # set logging level to user-specified level log.setLevel(log_level) if not user_table: all_mvm_exposures = [] log.debug("Interpret the poller file for the observation set.") obset_table = build_poller_table( results, log_level, all_mvm_exposures=all_mvm_exposures, poller_type="mvm", include_small=include_small, only_cte=only_cte, ) else: obset_table = copy.deepcopy(user_table) # Add INSTRUMENT column instr = [ INSTRUMENT_DICT[fname.split("_")[-2][0]] for fname in obset_table["filename"] ] # convert input to an Astropy Table for parsing obset_table.add_column(Column(instr, name="instrument")) # Add Date column # Uncomment this if we want to control the observation date in the same way as the exposure times. if layer_method == "all": years = ["all"] * len(obset_table) else: years = [ int(fits.getval(f, "date-obs").split("-")[0]) for f in obset_table["filename"] ] obset_table.add_column(Column(years), name="year_layer") # Sort the rows of the table in an effort to optimize the number of quality # sources found in the initial images obset_table = sort_poller_table(obset_table) define_exp_layers(obset_table, method=layer_method, exp_limit=exp_limit) # parse Table into a tree-like dict log.debug("Build the multi-visit layers tree.") obset_tree = build_mvm_tree(obset_table) # Now create the output product objects log.debug( "Parse the observation set tree and create the exposure, filter, and total detection objects." ) obset_dict, tdp_list = parse_mvm_tree(obset_tree, all_mvm_exposures, log_level) # Now we need to merge any pre-existing layer products with the new products # This will add the exposure products from the pre-existing definitions of # the overlapping sky cell layers with those defined for the new exposures # # obset_dict, tdp_list = merge_mvm_trees(obset_tree, layer_tree, log_level) # This little bit of code adds an attribute to single exposure objects that is True # if a given filter only contains one input (e.g. n_exp = 1) for filt_obj in tdp_list: if len(filt_obj.edp_list) == 1: is_singleton = True else: is_singleton = False for edp_obj in filt_obj.edp_list: edp_obj.is_singleton = is_singleton return obset_dict, tdp_list
# Translate the database query on an obset into actionable lists of filenames def build_mvm_tree(obset_table): """Convert obset table into a tree listing all products to be created.""" # Each product will consist of the appropriate string as the key and # a dict of 'info' and 'files' information # Start interpreting the obset table obset_tree = {} for row in obset_table: # Get some basic information from the first row - no need to check # for multiple instruments as data from different instruments will # not be combined. det = row["detector"] orig_filt = row["filters"] exp_layer = row["exp_layer"] year_layer = row["year_layer"] skycell = row["skycell_id"] # Potentially need to manipulate the 'filters' string for instruments # with two filter wheels filt = determine_filter_name(orig_filt) row["filters"] = filt # Define the key for the sky cell layer based on skycell, filter, # then optionally exposure times and/or observation date. layer = (skycell, filt, exp_layer, year_layer) # Insure that no matter how 'exp_layer' gets turned off, it gets recognized. if exp_layer in ["ALL", "all", "None", "", " ", None]: # Turn off use of 'exp_layer' as an output layer for the sky cell. layer = None row_info, filename = create_mvm_info(row) # Define key for combined sky cell layer (all exposure times in one layer) row_info_all = row_info.split(" ") row_info_all[4] = "ALL" row_info_all = " ".join(row_info_all) layer_all = (skycell, filt, "all", year_layer) # Initial population of the obset tree for this detector if det not in obset_tree: obset_tree[det] = {} if layer: obset_tree[det][layer] = [(row_info, filename)] obset_tree[det][layer_all] = [(row_info_all, filename)] else: det_node = obset_tree[det] if layer: if layer not in det_node: det_node[layer] = [(row_info, filename)] else: det_node[layer].append((row_info, filename)) if layer_all not in det_node: det_node[layer_all] = [(row_info_all, filename)] else: det_node[layer_all].append((row_info_all, filename)) return obset_tree def create_mvm_info(row): """Build info string for a row from the obset table""" info_list = [ str(row["skycell_id"]), row["instrument"], row["detector"], row["filters"], str(row["exp_layer"]), str(row["year_layer"]), str(row["skycell_new"]), ] # info_list = [str(row['proposal_id']), "{}".format(row['obset_id']), row['instrument'], # row['detector'], row['filters'], row['exp_layer'], row['year_layer']] return " ".join(map(str.upper, info_list)), row["filename"] def parse_mvm_tree(det_tree, all_mvm_exposures, log_level): """Convert tree into products for sky cell layers Tree generated by `~create_mvm_info` will always have the following levels: * detector * filters * layer Each exposure will have an entry dict with keys 'info' and 'filename'. Products created will be: * a single sky cell layer product per 'detector/filter/layer' combination """ log.setLevel(log_level) # Initialize products dict obset_products = {} # For each level, define a product, starting with the detector used... prev_det_indx = 0 det_indx = 0 filt_indx = 0 filt_indx_inc = 1 tdp_list = [] # Setup products for each detector used # # det_tree = {'UVIS': {'f200lp': [('skycell_p1234_x01y01 WFC3 UVIS IACS01T9Q F200LP 1', 'iacs01t9q_flt.fits'), # ('skycell_p1234_x01y01 WFC3 UVIS IACS01TBQ F200LP 1', 'iacs01tbq_flt.fits')]}, # 'IR': {'f160w': [('skycell_p1234_x01y01 WFC3 IR IACS01T4Q F160W 1', 'iacs01t4q_flt.fits')]}} for filt_tree in det_tree.values(): det_indx += 1 # Find all filters used... # filt_tree = {'f200lp': [('skycell_p1234_x01y01 WFC3 UVIS IACS01T9Q F200LP 1', 'iacs01t9q_flt.fits'), # ('skycell_p1234_x01y01 WFC3 UVIS IACS01TBQ F200LP 1', 'iacs01tbq_flt.fits')]} for filter_files in filt_tree.values(): # Use this to create and populate filter product dictionary entry fprod = FP_STR.format(filt_indx) obset_products[fprod] = {"info": "", "files": []} # Populate single exposure dictionary entry now as well # filename = ('skycell_p1234_x01y01 WFC3 UVIS IACS01T9Q F200LP 1', 'iacs01t9q_flt.fits') # filter_members, filetype = select_common_filetype(filter_files) for filename, is_member in zip(filter_files, filter_members): if det_indx != prev_det_indx: prev_det_indx = det_indx # If this input exposure does not have the same filetype (FLT/FLC) as the # rest of the images in this layer, ignore it. if not is_member: continue # Generate the full product dictionary information string: # # [row['proposal_id'], row['obset_id'], row['instrument'], # row['detector'], row['filters'], row['exp_layer'], row['year_layer']] # prod_info = (filename[0] + " " + filetype).lower() # # mvm prod_info = 'skycell_p1234_x01y01 wfc3 uvis f200lp all 2009 1 drz' # prod_list is: # skycell, instrument, detector, filters, exposure_layer, year_layer, old_or_new, and filetype (CTE corrected or not). prod_list = prod_info.split(" ") multi_scale = prod_list[2].upper() in ["IR", "PC"] pscale = "fine" if not multi_scale else "coarse" prod_info += " {:s}".format(pscale) if prod_list[5].strip() != "": layer = (prod_list[3], pscale, prod_list[4], prod_list[5]) else: layer = (prod_list[3], pscale, prod_list[4]) ftype = prod_list[-1] cellid = prod_list[0].split("-")[1] xindx = cellid.index("x") prop_id = cellid[1:xindx] # projection cell number obset_id = cellid[xindx:] # x and y skycell coordniates # Create a single sky cell exposure product object # __init__(self, prop_id, obset_id, instrument, detector, filename, filters, filetype, log_level) sep_obj = SkyCellExposure( prop_id, obset_id, prod_list[1], prod_list[2], "empty_aperture", #aperture filename[1], layer, ftype, log_level, ) # set flag to record whether this is a 'new' exposure or one that # has already been aligned to a layer already: # 1 means it is 'new' to the layer # 0 means it is being reprocessed # sep_obj.new_process = int(prod_list[-2]) # Set up the filter product dictionary and create a filter product object # The SkyCellProduct is the MVM equivalent of the SVM FilterProduct. Each SkyCellProduct # is comprised of a list of exposures in the visit which were taken with the same filter. # Initialize `info` key for this filter product dictionary if not obset_products[fprod]["info"]: obset_products[fprod]["info"] = prod_info # Create a filter product object for this instrument/detector # FilterProduct(prop_id, obset_id, instrument, detector, # filename, filters, filetype, log_level) filt_obj = SkyCellProduct( str(0), str(0), prod_list[1], prod_list[2], "empty_aperture", #aperture prod_list[0], layer, ftype, log_level, ) # Append exposure object to the list of exposure objects for this specific filter product object filt_obj.add_member(sep_obj) # Populate filter product dictionary with input filename obset_products[fprod]["files"].append(filename[1]) # Check to see whether an additional layer with a different plate scale should be generated # Primarily for WFC3/IR fine vs coarse layers if pscale == "coarse": fprod_fine = FP_STR.format(filt_indx + 1) filt_indx_inc = 2 # Generate 'fine' layer as well prod_info_fine = prod_info.replace("coarse", "fine") layer_fine = (layer[0], "fine") + layer[2:] if fprod_fine not in obset_products: obset_products[fprod_fine] = { "info": prod_info_fine, "files": [], } # Create a filter product object for this instrument/detector # FilterProduct(prop_id, obset_id, instrument, detector, # filename, filters, filetype, log_level) filt_obj_fine = SkyCellProduct( str(0), str(0), prod_list[1], prod_list[2], "empty_aperture", #aperture prod_list[0], layer_fine, ftype, log_level, ) obset_products[fprod_fine]["files"].append(filename[1]) filt_obj_fine.add_member(sep_obj) filt_indx += filt_indx_inc # Add this filter object to the list for creating that layer BUT # ONLY if there are new exposures being processed for this layer, # if filt_obj.new_to_layer > 0: # Append filter object to the list of filter objects for this specific total product object log1 = "Attach the sky cell layer object {}" log2 = "to its associated total product object {}/{}." log.debug( " ".join([log1, log2]).format( filt_obj.filters, filt_obj.instrument, filt_obj.detector ) ) filt_obj.add_all_mvm_exposures_list(all_mvm_exposures) if pscale == "coarse": filt_obj_fine.add_all_mvm_exposures_list(all_mvm_exposures) # For MVM processing there are no TotalProduct objects, and the TotalProduct list is a list # comprised of the SkyCellProduct objects which are equivalent to the FilterProduct objects # in SVM processing. The need here is to gather all the SkyCellProduct objects, whether they # are "coarse" or "fine", into a list. tdp_list.append(filt_obj) if pscale == "coarse": tdp_list.append(filt_obj_fine) # Done... return dict and object product list return obset_products, tdp_list # ----------------------------------------------------------------------------- # Utility Functions #
[docs] def parse_obset_tree(det_tree, log_level): """Convert tree into products Tree generated by ``create_row_info()`` will always have the following levels: * detector * filters * exposure Each exposure will have an entry dict with keys 'info' and 'filename'. Products created will be: * total detection product per detector * filter products per detector * single exposure product * grism/prism single exposure product [only if Grism/Prism exposures] Grism/Prism images were added to the "doProcess" list after most of this code was developed as they need to have the same Primary WCS as the direct images from the same detector in the visit. These images are deliberately not added to the output obset_products as they will not be drizzled. They are added as individual GrismExposureProduct objects to the TotalProduct only. The GrismExposureProduct list in the TotalProduct is separate and distinct from the ExposureProduct list of direct images. """ log.setLevel(log_level) # Initialize products dict obset_products = {} # For each level, define a product, starting with the detector used... prev_det_indx = 0 det_indx = 0 filt_indx = 0 sep_indx = 0 tdp_list = [] # Setup products for each detector used for filt_tree in det_tree.values(): totprod = TDP_STR.format(det_indx) obset_products[totprod] = {"info": "", "files": []} det_indx += 1 # Find all filters used... for filter_files in filt_tree.values(): # Use this to create and populate filter product dictionary entry fprod = FP_STR.format(filt_indx) obset_products[fprod] = {"info": "", "files": []} filt_indx += 1 # Populate single exposure dictionary entry now as well filter_members, filetype = select_common_filetype(filter_files) for filename, is_member in zip(filter_files, filter_members): if det_indx != prev_det_indx: prev_det_indx = det_indx if not is_member: # This logic should get the opposite filetype from the # value returned for the entire filter (at least that is the plan) exp_filetype = "drz" if filetype == "drc" else "drc" else: exp_filetype = filetype # Generate the full product dictionary information string: # proposal_id, obset_id, instrument, detector, ipppssoot, filter, and filetype # filename = ('11515 01 WFC3 UVIS IACS01T9Q F200LP', 'iacs01t9q_flt.fits') exp_prod_info = (filename[0] + " " + exp_filetype).lower() # Set up the single exposure product dictionary sep = SEP_STR.format(sep_indx) obset_products[sep] = {"info": exp_prod_info, "files": [filename[1]]} # Increment single exposure index sep_indx += 1 # Create a single exposure product object # The GrismExposureProduct is only an attribute of the TotalProduct. prod_list = exp_prod_info.split(" ") # prod_list is 0: proposal_id, 1:observation_id, 2:instrument, 3:detector, # 4:aperture_from_poller, 5:filename, 6:filters, 7:filetype # The prod_list[6] is the filter - use this information to distinguish between # a direct exposure for drizzling (ExposureProduct) and an exposure # (GrismExposureProduct) which is carried along (Grism/Prism) to make analysis # easier for the user by having the same WCS in both the direct and # Grism/Prism products. # Determine if this image is a Grism/Prism or a nominal direct exposure is_grism = False if ( prod_list[6].lower().find("g") != -1 or prod_list[6].lower().find("pr") != -1 ): is_grism = True filt_indx -= 1 grism_sep_obj = GrismExposureProduct( prod_list[0], # prop_id prod_list[1], # obset_id prod_list[2], # instrument prod_list[3], # detector prod_list[4], # aperture_from_poller filename[1], # filename prod_list[6], # filters prod_list[7], # filetype log_level, # log_level ) else: sep_obj = ExposureProduct( prod_list[0], prod_list[1], prod_list[2], prod_list[3], prod_list[4], filename[1], prod_list[6], prod_list[7], log_level, ) # Now that we have defined the ExposureProduct for this input exposure, # do not include it any total or filter product. if not is_member: continue prod_info = (filename[0] + " " + filetype).lower() # Redefine for the filter and total product definitions # This will insure the correct product type (DRZ vs DRC) gets passed along prod_list = prod_info.split(" ") # Set up the filter product dictionary and create a filter product object # Initialize `info` key for this filter product dictionary if not is_grism: if not obset_products[fprod]["info"]: obset_products[fprod]["info"] = prod_info # Create a filter product object for this instrument/detector filt_obj = FilterProduct( prod_list[0], prod_list[1], prod_list[2], prod_list[3], prod_list[4], prod_list[5], prod_list[6], prod_list[7], log_level, ) # Append exposure object to the list of exposure objects for this specific filter product object filt_obj.add_member(sep_obj) # Populate filter product dictionary with input filename obset_products[fprod]["files"].append(filename[1]) # Set up the total detection product dictionary and create a total detection product object # Initialize `info` key for total detection product if not obset_products[totprod]["info"]: obset_products[totprod]["info"] = prod_info # Create a total detection product object for this instrument/detector tdp_obj = TotalProduct( prod_list[0], prod_list[1], prod_list[2], prod_list[3], prod_list[4], prod_list[5], prod_list[7], log_level, ) if not is_grism: # Append exposure object to the list of exposure objects for this specific total detection product tdp_obj.add_member(sep_obj) # Populate total detection product dictionary with input filename obset_products[totprod]["files"].append(filename[1]) # Append filter object to the list of filter objects for this specific total product object log.debug( "Attach the filter object {} to its associated total detection product object {}/{}.".format( filt_obj.filters, tdp_obj.instrument, tdp_obj.detector ) ) # Identify what exposures should use single-image CR identification algorithm is_ccd = not ( filt_obj.instrument.lower() == "wfc3" and filt_obj.detector.lower() == "ir" ) if is_ccd and len(filt_obj.edp_list) == 1: for e in filt_obj.edp_list: e.crclean = True elif is_grism: tdp_obj.add_grism_member(grism_sep_obj) if not is_grism: tdp_obj.add_product(filt_obj) # Add the total product object to the list of TotalProducts tdp_list.append(tdp_obj) # If the total product object (TDP) has Grism/Prism members, it MUST have direct exposure # members too, or the TDP is not valid. If the TDP is not valid, it is still necessary # to generate a log file for proper tracking of the processing, so just delete # the disk SVM FLT/FLC files for the Grism/Prism images. Allow the the TDP to be passed # back to run_hap_processing (hapsequencer.py) where the validity of the product can be handled. for tdp in tdp_list: if tdp.grism_edp_list and not tdp.edp_list: for item in tdp.grism_edp_list: try: os.remove(item.full_filename) except OSError: pass # Done... return dict and object product list return obset_products, tdp_list
# ------------------------------------------------------------------------------ def select_common_filetype(filter_files): """Select common filter type and set of filter_files for input list of filenames. Parameters ----------- filter_files : list List of tuples describing input files for the filter product Returns ------- filter_members : list Mask indicating which input files are members correspond to the output product type prod_type : str String corresponding to the suffix (DRZ vs DRC) to be used for the output product based on the common filter type (FLT vs FLC) """ filetypes = np.array([f[1][:-5].split("_")[-1] for f in filter_files]) # Check whether or not all input files for this filter product have the same filetype (suffix) filetypes_flt = filetypes == "flt" filetypes_flc = filetypes == "flc" num_flc = len(np.where(filetypes_flc)[0]) # determine whether we have FLC images in set or not # and set the filter_type accordingly filter_type = "flc" if num_flc > 0 else "flt" filter_members = filetypes_flc if filter_type == "flc" else filetypes_flt # Return the updated/cleaned list of input files, # along with the suffix for the files in the input list for this filter product prod_type = "drc" if filter_type == "flc" else "drz" return filter_members, prod_type # ------------------------------------------------------------------------------ def define_exp_layers(obset_table, method="hard", exp_limit=None): """Determine what exposures will be grouped into the same layer of a sky cell""" # Add 'exp_layer' column to table if method not in SUPPORTED_EXP_METHODS: raise ValueError( "Please use a supported method: {}".format(SUPPORTED_EXP_METHODS) ) if method == "kmeans": # Use pre-defined limits on exposure times for clusters exptime_range = obset_table["exptime"].max() / obset_table["exptime"].min() if exp_limit is not None and exptime_range > exp_limit: kmeans = KMeans(n_clusters=3, random_state=0).fit(obset_table["exptime"]) # Sort and identify the clusters in increasing duration centers = kmeans.cluster_centers_.reshape(1, -1)[0].argsort() exp_layer = [centers[l] for l in kmeans.labels_] else: exp_layer = [None] * len(obset_table) if method == "all": exp_layer = [None] * len(obset_table) else: # Use pre-defined limits for selecting layer members # Subtraction by 1 puts the range from 0-2 to be consistent with KMeans exp_layer = np.digitize(obset_table["exptime"], EXP_LIMITS) - 1 # Add column to the table as labelled values ('short', 'med', 'long', 'all') obset_table["exp_layer"] = [EXP_LABELS[e].upper() for e in exp_layer] # ------------------------------------------------------------------------------ def determine_filter_name(raw_filter): """ Generate the final filter name to be used for an observation. Parameters ---------- raw_filter : string filters component one exposure from an input observation visit Returns ------- filter_name : string final filter name If the raw_filter is actually a combination of two filter names, as can be true for instruments with two filter wheels, then generate a new filter string according the following rules: - If one filter name is 'clear*', then use the other filter name. - If both filter names are 'clear*', then use 'clear'. - If there are two filters in use, then use 'filter1-filter2'. - If one filter is a polarizer ('pol*'), then always put the polarizer name second (e.g., 'f606w-pol60'). - If one filter is 'n/a' (as is the case for SBC), then use the other filter name. NOTE: At this time (February 2020) the filter names for the SBC detector of ACS have malformed names (e.g., F140LP;N/A;F140LP) where the final token delimited by the ";" should not be present. Remove the final delimiter and entry. Unlike the other ACS detectors, SBC only uses a single filter wheel. - NOTE: There should always be at least one filter name provided to this routine or this input is invalid. """ raw_filter = raw_filter.lower() # There might be multiple filters, so split the filter names into a list # and only retain the first two entries. SBC has a bogus third entry. filter_list = raw_filter.split(";")[0:2] output_filter_list = [] for filt in filter_list: if not any(x in filt for x in ["clear", "n/a"]): output_filter_list.append(filt) if not output_filter_list: output_filter_list = ["clear"] else: if output_filter_list[0].startswith("pol"): output_filter_list.reverse() delimiter = "-" filter_name = delimiter.join(output_filter_list).rstrip(delimiter) return filter_name # ------------------------------------------------------------------------------
[docs] def build_poller_table( input: str, log_level, all_mvm_exposures=[], poller_type="svm", include_small=True, only_cte=False, ): """Create a poller file from dataset names for either SMV or MVM processing. Information is either gathered from the poller file or by using the filename to open the file and pulling information from the header keywords. The code treats WFPC2 differently, by uses both approaches. For WFPC2, We use simple poller files with a second column that includes the aperture. The code gathers the rest of the relevant informaiton from the header keywords. Parameters ----------- input : str, list Filename with list of dataset names, or just a Python list of dataset names, provided by the user. log_level : int The desired level of verboseness in the log statements displayed on the screen and written to the . log file. all_mvm_exposures : str, list List of all the MVM FLT/FLC exposure filenames in the list. This will include the filenames which are dropped because they have been processed previously. poller_type : str, optional The type of poller file being processed. Either 'svm' for single visit mosaic, or 'mvm' for multi-visit mosaic. Unless explicitly specified, the default value is 'svm'. Returns -------- poller_table : Table Astropy table object with the same columns as a poller file. """ log.setLevel(log_level) is_poller_file = False # Check the input file is not empty if not isinstance(input, list) and not os.path.getsize(input): log.error( "Input poller manifest file, {}, is empty - processing is exiting.".format( input ) ) sys.exit(0) if poller_type == "mvm": poller_colnames = MVM_POLLER_COLNAMES poller_dtype = MVM_POLLER_DTYPE else: poller_colnames = POLLER_COLNAMES poller_dtype = POLLER_DTYPE datasets = [] # limit column string types to minimum length formats e.g. str8, str11, etc. obs_converters = {"col4": [ascii.convert_numpy(np.str_)]} if isinstance(input, str): input_table = ascii.read(input, format="no_header", converters=obs_converters) if len(input_table.columns) == 1: input_table.columns[0].name = "filename" input_table["aperture"] = "empty_aperture" poller_dtype += ("str",) is_poller_file = False # gets important keywords from file headers instead of poller file # unique logic to collect WFPC2 aperture data from poller file elif len(input_table.columns) == 2: input_table.columns[0].name = "filename" input_table.columns[1].name = "aperture" # add dtype for aperture column poller_dtype += ("str",) is_poller_file = False elif len(input_table.columns) == len(poller_colnames): # We were provided a poller file # Now assign column names to table for i, colname in enumerate(poller_colnames): input_table.columns[i].name = colname # Convert to a string column, instead of int64 input_table["obset_id"] = input_table["obset_id"].astype(np.str_) # Convert string column into a Bool column # The input poller file reports True if it has been reprocessed. # This code interprets that as False since it is NOT new, so the code # inverts the meaning from the pipeline poller file. if poller_type == "mvm": # Translate new format back to old format ("NEW" -> 0 and "OLD" -> 1) poller_table_mapping = {"NEW": 0, "OLD": 1} reverse_table_mapping = {"0": "NEW", "1": "OLD"} rows_to_drop = [] for tbl_ctr in range(0, len(input_table)): all_mvm_exposures.append(input_table[tbl_ctr]["filename"]) if input_table[tbl_ctr]["skycell_new"].upper() in ["OLD", "NEW"]: if input_table[tbl_ctr]["skycell_new"].upper() == "OLD": rows_to_drop.append(tbl_ctr) input_table[tbl_ctr]["skycell_new"] = poller_table_mapping[ input_table[tbl_ctr]["skycell_new"].upper() ] elif input_table[tbl_ctr]["skycell_new"] in ["0", "1"]: err_msg = "'{}' is an invalid skycell_new poller file value. (Legal values: 'NEW' or 'OLD'). Please use '{}' instead of '{}'. Exiting... ".format( input_table[tbl_ctr]["skycell_new"], reverse_table_mapping[input_table[tbl_ctr]["skycell_new"]], input_table[tbl_ctr]["skycell_new"], ) log.error(err_msg) raise Exception(err_msg) else: err_msg = "'{}' is an invalid skycell_new poller file value. (Legal values: 'NEW' or 'OLD'). Exiting... ".format( input_table[tbl_ctr]["skycell_new"] ) log.error(err_msg) raise Exception(err_msg) # Apply logic for ignoring additional data, based on which environment variables are defined # when defining what output SkyCell layers to generate from a poller file # Need to ignore non-CTE-corrected UVIS data # Note: ignoring ACS/HRC and ACS/SBC from input list for MVM processing is done in analyze.analyze_wrapper(). cte_flag = input_table[tbl_ctr]["filename"][-6] == "c" # for WFC3 data, if UVIS and not CTE-corrected, flag for removal from processing wf3_cte = ( input_table[tbl_ctr]["detector"].upper() == "UVIS" and not cte_flag ) # for ACS data, if WFC and not CTE-corrected, flag for removal from processing acs_cte = ( input_table[tbl_ctr]["detector"].upper() == "WFC" and not cte_flag ) # if only CTE data is requested, and either acs_cte or wf3_cte is True, then remove from processing if only_cte and (wf3_cte or acs_cte): rows_to_drop.append(tbl_ctr) # Omit input images listed as "OLD" in the poller file from processing if len(rows_to_drop) == len(input_table): err_msg = "All images have already been MVM processed. No new MVM processing is needed. Exiting..." log.error(err_msg) sys.exit(analyze.Ret_code.NO_VIABLE_DATA.value) elif len(rows_to_drop) == 0: log.info( "None of the input images have previously been MVM processed. Proceeding with MVM processing of all input images... " ) else: log.info( "The following {} input image(s) have already been MVM processed and will be omitted from MVM processing:".format( len(rows_to_drop) ) ) for tbl_idx in rows_to_drop: log.info(" {}".format(input_table[tbl_idx]["filename"])) input_table.remove_rows(rows_to_drop) input_table["skycell_new"] = [ int(not BOOL_STR_DICT[str(val).upper()]) for val in input_table["skycell_new"] ] is_poller_file = True # Check that each file listed in the poller file exists in the current working directory. If a # file is missing, copy it over from the path specified in the poller file. Failing that, raise # an exception and exit. for table_line in input_table: if os.path.exists(table_line["filename"]): log.info( f"Input image {table_line['filename']} found in current working directory." ) elif os.path.exists(table_line["pathname"]): log.info( f"Input image {table_line['filename']} not found in current working directory. However, it was found in the path specified in the poller file." ) shutil.copy(table_line["pathname"], os.getcwd()) log.info( f"Input image {table_line['pathname']} copied to current working directory." ) else: log.error( f"Input image {table_line['filename']} not found in current working directory." ) log.error( f"Archived input image {table_line['pathname']} not found." ) err_msg = f"Input image {table_line['filename']} missing from current working directory and from the path specified in the poller file. Exiting... " log.error(err_msg) raise Exception(err_msg) elif (poller_type == "mvm") and ( len(input_table.columns) != len(poller_colnames) ): log.error( f"MVMs should use full poller files with {len(poller_colnames)} columns." ) err_msg = f"Full poller files should have {len(poller_colnames)} columns. Exiting... " log.error(err_msg) raise Exception(err_msg) # input is string with unexpected number of columns else: log.error( f"Poller file has an unexpected number of columns, code expects either 1, 2, or {len(poller_colnames)} but received: {len(input_table.columns)}" ) raise ValueError # Since a poller file was the input, it is assumed all the input # data is in the locale directory so just collect the filenames. # datasets = input_table[input_table.colnames[0]].tolist() filenames = list(input_table.columns[0]) # If input is a list of filenames elif isinstance(input, list): filenames = input input_table = None else: id = "[poller_utils.build_poller_table] " log.error( "{}: Input {} not supported as input for processing.".format(id, input) ) raise ValueError # At this point, we have a poller file or a list of filenames. If the latter, then any individual # filename can be a singleton or an association name. We need to get the full list of actual # filenames from the association name. if not is_poller_file: for filename in filenames: # Look for dataset in local directory. if "asn" in filename or not os.path.exists(filename): # This retrieval will NOT overwrite any ASN members already on local disk # Return value will still be list of all members files = aqutils.retrieve_observation( [filename[:9]], suffix=["FLC"], clobber=False ) if len(files) == 0: log.error("Filename {} not found in archive!!".format(filename)) log.error("Please provide ASN filename instead!") raise ValueError else: files = [filename] datasets += files all_mvm_exposures = datasets else: datasets = filenames # Each image, whether from a poller file or from an input list needs to be # analyzed to ensure it is viable for drizzle processing. If the image is not # viable, it should not be included in the output "poller" table. # NOTE: The "all_mvm_exposures" variable does not need to be checked for "usable" datasets # as it is used to create a WCS/footprint to accommodate *all* exposures in an MVM visit. usable_datasets, usable_dataset_index, return_code = analyze.analyze_wrapper( datasets, use_sbchrc=include_small, type=poller_type ) if not usable_datasets: log.warning( "No usable images in poller file or input list for drizzling. The processing of this data is ending." ) sys.exit(return_code) else: log.info( "There are {} usable images identified in the poller file for processing.".format( len(usable_datasets) ) ) cols = OrderedDict() for cname in poller_colnames: cols[cname] = [] cols["filename"] = usable_datasets if input_table: if "aperture" in input_table.colnames: aperture_all_datasets = input_table["aperture"].tolist() aperture_of_useable_dataset = np.array(aperture_all_datasets)[usable_dataset_index] cols["aperture"] = aperture_of_useable_dataset else: cols["aperture"] = ["empty_aperture"] * len(usable_datasets) add_col = Column( ["empty_aperture"] * len(input_table), name="aperture", dtype="str" ) input_table.add_column(add_col, index=7) poller_dtype += ("str",) else: raise ValueError("Input table is empty. Exiting...") # If MVM processing and a poller file is the input, this implies there is # only one skycell of interest for all the listed filenames in the poller # file. Establish the WCS, but no need for discovery of overlapping skycells # as would be the case for an input list of filenames. if poller_type == "mvm" and is_poller_file: pipeline_skycell_id = input_table[0]["skycell_id"] scells = {} skycell_obj = cell_utils.SkyCell.from_name(pipeline_skycell_id) skycell_obj.members = filenames scells[pipeline_skycell_id] = skycell_obj scell_files = cell_utils.interpret_scells(scells) log.debug("MVM and poller file. scell_files: {}".format(scell_files)) # If processing a list of files, evaluate each input dataset for the information needed # for the poller file if not is_poller_file: for d in usable_datasets: with fits.open(d) as dhdu: hdr = dhdu[0].header cols["program_id"].append(d[1:4].upper()) cols["obset_id"].append(str(d[4:6])) cols["proposal_id"].append(hdr["proposid"]) cols["exptime"].append(hdr["exptime"]) cols["detector"].append(hdr["detector"]) cols["pathname"].append(os.path.abspath(d)) # process filter names if d[0] == "j": # ACS data filters = processing_utils.get_acs_filters(dhdu, all=True) elif d[0] == "i": filters = hdr["filter"] elif d[0] == "u": filters = processing_utils.get_wfpc2_filters(dhdu, all=True) cols["filters"].append(filters) if poller_type == "mvm": # interpret_scells returns: # {'filename1':{'<sky cell id1>': SkyCell1, # '<sky cell id2>':SkyCell2, # 'id': "<sky cell id1>;<sky cell id2>"}, # 'filename2': ...} # This preserves 1 entry per filename, while providing info on # multiple SkyCell's for each filename as appropriate. # cols["skycell_id"] = [ scell_files[fname]["id"] for fname in cols["filename"] ] cols["skycell_new"] = [1] * len(cols["filename"]) # # Build output table # poller_data = [col for col in cols.values()] poller_names = [colname for colname in cols] poller_table = Table(data=poller_data, names=poller_names, dtype=poller_dtype) # The input was a poller file, so just keep the viable data rows for output else: good_rows = [] for d in usable_datasets: for i, old_row in enumerate(input_table): if d == input_table["filename"][i]: good_rows.append(old_row) # This table contains the pipeline specified skycell_id for each row # which should be the same value in every row poller_table = Table( rows=good_rows, names=input_table.colnames, dtype=poller_dtype ) # # If 'mvm' poller file, expand any multiple skycell entries into separate rows # if poller_type == "mvm": # A new row will need to be added for each additional SkyCell that the # file overlaps... # poller_table["skycell_obj"] = [None] * len(poller_table) # # Make a copy of the original poller_table # new_poller_table = poller_table[poller_table["filename"] != None] for name in scell_files: for scell_id in scell_files[name]: if scell_id != "id": scell_obj = scell_files[name][scell_id] for indx, row in enumerate(poller_table): if row["filename"] != name: continue if new_poller_table[indx]["skycell_obj"] is None: new_poller_table[indx]["skycell_obj"] = scell_obj new_poller_table[indx]["skycell_id"] = scell_id else: poller_rows = poller_table[poller_table["filename"] == name] sobj0 = poller_rows["skycell_obj"][0] # Select only 1 row regardless of how many we have already # added for this filename (in case file overlapped more than # 2 sky cells at once). poller_row = poller_rows[ poller_rows["skycell_obj"] == sobj0 ] # make copy of row for this filename # assign updated values to skycell columns poller_row["skycell_id"] = scell_id poller_row["skycell_obj"] = scell_obj # append new row to table new_poller_table.add_row(poller_row[0]) # All the work has been done to setup the poller table which accommodates # both a pipeline supplied poller file, as well as a list of filenames. The # table contains all sky cells that overlap with the input files. However, the # pipeline supplied poller file originally specified the single skycell to be # processed under "pipeline" conditions. If this invocation is by a poller file, # then trim the table to contain only the rows which match the sky cell specified # in the poller file. pipeline_skycell_id = poller_table[0]["skycell_id"] new_poller_table = new_poller_table[ new_poller_table["skycell_id"] == pipeline_skycell_id ] if not new_poller_table: log.error( "No sky cell found which matches the sky cell specified in the poller file {}.".format( pipeline_skycell_id ) ) sys.exit(0) poller_table = new_poller_table return poller_table
# ------------------------------------------------------------------------------ def sort_poller_table(obset_table): """Sort the input table by photflam and exposure time. The alignment of data within a single visit is first done in a relative way, successively adding images to the stack within the visit. The next step is to achieve absolute alignment of the stack of images to the GAIA catalog. One scheme to obtain the largest number of quality (unsaturated) fiducials/sources, is to sort the images first according to their inverse sensitivity (photflam) and then according to their exposure time. The obset_table has the following columns: filename, proposal_id, program_id, obset_id, exptime, filters, detector, and pathname Parameters ---------- obset_table: Astropy table Astropy table object with columns which map to the original poller file Returns ------- updated_obset_table: Astropy table The sorted version of the input Astropy table """ # Create a copy of the input table and add the photflam column with a filler value expanded_obset_table = Table(obset_table) expanded_obset_table["flam"] = -999999.0 for row in expanded_obset_table: input_file = row[expanded_obset_table.colnames[0]] # Open the specified FITS file h0 = getheader(input_file, 0) # Need to get the instrument and detector keywords in order to determine # where to look for the various necessary keywords (i.e., primary or # extension) instrument = h0["instrume"].upper() detector = h0["detector"].upper() # HST IMAGE # photflam: inverse sensitivity, ergs/s-/cm2-/A-1 for 1 electron/s # PHOTFLAM keyword is found in each science extension for the currently # supported instruments (as of 01 Jan 2020), except for WFC3/IR where # PHOTFLAM is in the Primary. # # Although the PHOTFLAM keyword is science extension-dependent, # the differences in values is so small as to not be relevant in # this particular context. if instrument == "WFC3" and detector == "IR": photflam = h0["photflam"] else: h1 = getheader(input_file, "sci", 1) photflam = h1["photflam"] row["flam"] = photflam # Determine the rank order the data with a primary key of photflam and a secondary key # of exposure time (in seconds). The primary and secondary keys both need # to be sorted in descending order. Use the rank to sort # the original input table for output. # Unfortunately, there are cases where the default works better; namely, # fields with few point sources which are bright that are saturated in the # longer exposure time images (like j8cw03 and j8cw53). # rank = np.lexsort((-expanded_obset_table['flam'], -expanded_obset_table['exptime'])) # Original implementation: # rank = np.lexsort((expanded_obset_table['exptime'], -expanded_obset_table['flam'])) rank = np.lexsort((-expanded_obset_table["flam"], expanded_obset_table["exptime"])) updated_obset_table = obset_table[rank] return updated_obset_table # ------------------------------------------------------------------------------ def add_primary_fits_header_as_attr(hap_obj, log_level=logutil.logging.NOTSET): """create object attribute containing image primary header info Parameters ---------- hap_obj : drizzlepac.haputils.Product.TotalProduct, drizzlepac.haputils.Product.FilterProduct, or drizzlepac.haputils.Product.ExposureProduct, depending on input object to update log_level : int, optional. The desired level of verboseness in the log statements displayed on the screen and written to the .log file. Returns ------- hap_obj : drizzlepac.haputils.Product.TotalProduct, drizzlepac.haputils.Product.FilterProduct, or drizzlepac.haputils.Product.ExposureProduct, depending on input updated version of input object """ if os.path.exists(hap_obj.drizzle_filename): file_name = hap_obj.drizzle_filename else: file_name = hap_obj.full_filename hap_obj.primary_header = fits.getheader(file_name) log.info("added primary header info from file {} to {}".format(file_name, hap_obj)) return hap_obj