Source code for drizzlepac.photeq

"""
A tool to adjust data values of images by equalizing each chip's PHOTFLAM value
to a single common value so that all chips can be treated equally
by ``AstroDrizzle``.

:Authors: Mihai Cara

:License: :doc:`LICENSE`

"""

__all__ = ['photeq']
__taskname__ = 'drizzlepac.photeq'
__version__ = '0.2'
__version_date__ = '06-Nov-2015'
__author__ = 'Mihai Cara'

# HISTORY:
# v0.1 - 14-Aug-2015 - Initial release
# v0.2 - 06-Nov-2015 - Added doctstrings, logging, TEAL interface


# STDLIB
import os
from datetime import datetime
import logging

# THIRD PARTY
import numpy as np
from astropy.io import fits

try:
    from stsci.tools import teal
except ImportError:
    teal = None

# LOCAL
from stsci.skypac import parseat, utils
from . import util

# create logger
logging.captureWarnings(capture=True)
_log = logging.getLogger(__name__)
_log.setLevel(logging.DEBUG)
_log_formatter = logging.Formatter('%(message)s')

# create console handler and set level to debug
_sh_log = logging.StreamHandler()
_sh_log.setLevel(logging.DEBUG)
_sh_log.setFormatter(_log_formatter)

# add _sh_log to logger
_log.addHandler(_sh_log)


def _mlinfo(msg, *args, **kwargs):
    lines = msg.splitlines()
    for line in lines:
        _log.info(line, *args, **kwargs)


def _mlwarn(msg, *args, **kwargs):
    lines = msg.splitlines()
    for line in lines:
        _log.warning(line, *args, **kwargs)


[docs]def photeq(files='*_flt.fits', sciext='SCI', errext='ERR', ref_phot=None, ref_phot_ext=None, phot_kwd='PHOTFLAM', aux_phot_kwd='PHOTFNU', search_primary=True, readonly=True, clobber=False, logfile='photeq.log'): """ Adjust data values of images by equalizing each chip's PHOTFLAM value to a single common value so that all chips can be treated equally by ``AstroDrizzle``. Parameters ---------- files : str (Default = ``'*_flt.fits'``) A string containing one of the following: * a comma-separated list of valid science image file names, e.g.: ``'j1234567q_flt.fits, j1234568q_flt.fits'``; * an @-file name, e.g., ``'@files_to_match.txt'``. See notes section for details on the format of the @-files. .. note:: **Valid science image file names** are: * file names of existing FITS, GEIS, or WAIVER FITS files; * partial file names containing wildcard characters, e.g., ``'*_flt.fits'``; * Association (ASN) tables (must have ``_asn``, or ``_asc`` suffix), e.g., ``'j12345670_asn.fits'``. sciext : str (Default = 'SCI') Extension *name* of extensions whose data and/or headers should be corrected. errext : str (Default = 'ERR') Extension *name* of the extensions containing corresponding error arrays. Error arrays are corrected in the same way as science data. ref_phot : float, None (Default = None) A number indicating the new value of PHOTFLAM or PHOTFNU (set by 'phot_kwd') to which the data should be adjusted. ref_phot_ext : int, str, tuple, None (Default = None) Extension from which the `photeq` should get the reference photometric value specified by the `phot_kwd` parameter. This parameter is ignored if `ref_phot` **is not** `None`. When `ref_phot_ext` is `None`, then the reference inverse sensitivity value will be picked from the first `sciext` of the first input image containing `phot_kwd`. phot_kwd : str (Default = 'PHOTFLAM') Specifies the primary keyword which contains inverse sensitivity (e.g., PHOTFLAM). It is used to compute conversion factors by which data should be rescaled. aux_phot_kwd : str, None, list of str (Default = 'PHOTFNU') Same as `phot_kwd` but describes *other* photometric keyword(s) that should be corrected by inverse of the scale factor used to correct data. These keywords are *not* used to compute conversion factors. Multiple keywords can be specified as a Python list of strings: ``['PHOTFNU', 'PHOTOHMY']``. .. note:: If specifying multiple secondary photometric keywords in the TEAL interface, use a comma-separated list of keywords. search_primary : bool (Default = True) Specifies whether to first search the primary header for the presence of `phot_kwd` keyword and compute conversion factor based on that value. This is (partially) ignored when `ref_phot` is not `None` in the sense that the value specified by `ref_phot` will be used as the reference *but* in all images primary will be searched for `phot_kwd` and `aux_phot_kwd` and those values will be corrected (if ``search_primary=True``). readonly : bool (Default = True) If `True`, `photeq` will not modify input files (nevertheless, it will convert input GEIS or WAVERED FITS files to MEF and could overwrite existing MEF files if `clobber` is set to `True`). The (console or log file) output however will be identical to the case when ``readonly=False`` and it can be examined before applying these changes to input files. clobber : bool (Default = False) Overwrite existing MEF files when converting input WAVERED FITS or GEIS to MEF. logfile : str, None (Default = 'photeq.log') File name of the log file. Notes ----- By default, `photeq` will search for the first inverse sensitivity value (given by the header keyword specified by the `phot_kwd` parameter, e.g., PHOTFLAM or PHOTFNU) found in the input images and it will equalize all other images to this reference value. It is possible to tell `photeq` to look for the reference inverse sensitivity value only in a specific extension of input images, e.g.: 3, ('sci',3), etc. This can be done by setting `ref_phot_ext` to a specific extension. This may be useful, for example, for WFPC2 images: WF3 chip was one of the better calibrated chips, and so, if one prefers to have inverse sensitivities equalized to the inverse sensitivity of the WF3 chip, one can set ``ref_phot_ext=3``. Alternatively, one can provide their own reference inverse sensitivity value to which all other images should be "equalized" through the parameter `ref_phot`. .. note:: Default parameter values (except for `files`, `readonly`, and `clobber`) should be acceptable for most HST images. .. warning:: If images are intended to be used with ``AstroDrizzle``, it is recommended that sky background measurement be performed on "equalized" images as the `photeq` is not aware of sky user keyword in the image headers and thus it cannot correct sky values already recorded in the headers. Examples -------- #. In most cases the default parameters should suffice: >>> from drizzlepac import photeq >>> photeq.photeq(files='*_flt.fits', readonly=False) #. If the re-calibration needs to be done on PHOTFNU rather than PHOTFLAM, then: >>> photeq.photeq(files='*_flt.fits', ref_phot='PHOTFNU', ... aux_phot_kwd='PHOTFLAM') #. If for WFPC2 data one desires that PHOTFLAM from WF3 be used as the reference in WFPC2 images, then: >>> photeq.photeq(files='*_flt.fits', ref_phot_ext=3) # or ('sci',3) """ # Time it runtime_begin = datetime.now() # check that input file name is a string: if not isinstance(files, str): raise TypeError("Argument 'files' must be a comma-separated list of " " file names") # Set-up log files: if isinstance(logfile, str): # first, in case there are any "leftover" file handlers, # close and remove them: for h in _log.handlers: if h is not _sh_log and isinstance(h, logging.FileHandler): h.close() _log.removeHandler(h) # create file handler: log_formatter = logging.Formatter('[%(levelname)s:] %(message)s') log_file_handler = logging.FileHandler(logfile) log_file_handler.setFormatter(log_formatter) # add log_file_handler to logger _log.addHandler(log_file_handler) elif logfile is not None: raise TypeError("Unsupported 'logfile' type") # BEGIN: _mlinfo("***** {0} started on {1}".format(__taskname__, runtime_begin)) _mlinfo(" Version {0} ({1})".format(__version__, __version_date__)) # check that extension names are strings (or None for error ext): if sciext is None: sci_ext4parse = '*' ext2get = None else: if not isinstance(sciext, str): raise TypeError("Argument 'sciext' must be a string or None") sciext = sciext.strip() if sciext.upper() == 'PRIMARY': sciext = sciext.upper() ext2get = (sciext, 1) else: ext2get = (sciext, '*') sci_ext4parse = ext2get if errext is not None and not isinstance(errext, str): raise TypeError("Argument 'errext' must be a string or None") # check that phot_kwd is supported: if not isinstance(phot_kwd, str): raise TypeError("Argument 'phot_kwd' must be a string") phot_kwd = phot_kwd.strip().upper() # check that ref_phot_ext has correct type: if ref_phot_ext is not None and not \ (isinstance(ref_phot_ext, int) or isinstance(ref_phot_ext, str) \ or (isinstance(ref_phot_ext, tuple) and len(ref_phot_ext) == 2 \ and isinstance(ref_phot_ext[0], str) and \ isinstance(ref_phot_ext[1], int))): raise TypeError("Unsupported 'ref_phot_ext' type") if isinstance(ref_phot_ext, str): ref_phot_ext = (ref_phot_ext, 1) if aux_phot_kwd is None: aux_phot_kwd = [] elif isinstance(aux_phot_kwd, str): aux_phot_kwd = [aux_phot_kwd.strip().upper()] if phot_kwd == aux_phot_kwd: raise ValueError("Auxiliary photometric keyword must be different " "from the main photometric keyword 'phot_kwd'.") elif hasattr(aux_phot_kwd, '__iter__'): if not all([isinstance(phot, str) for phot in aux_phot_kwd]): raise TypeError("Argument 'aux_phot_kwd' must be a string, list of " "strings, or None") aux_phot_kwd = [phot.strip().upper() for phot in aux_phot_kwd] if ref_phot in aux_phot_kwd: raise ValueError("Auxiliary photometric keyword(s) must be " "different from the main photometric keyword " "'phot_kwd'.") else: raise TypeError("Argument 'aux_phot_kwd' must be a string, list of " "strings, or None") # read input file list: fl = parseat.parse_cs_line(csline=files, default_ext=sci_ext4parse, im_fmode='readonly' if readonly else 'update', clobber=clobber, fnamesOnly=True, doNotOpenDQ=True) # check if user supplied file extensions, set them to the sciext, # and warn that they will be ignored: for f in fl: if f.count > 1 or f.fext[0] != sci_ext4parse: _mlwarn("WARNING: Extension specifications for file {:s} " "will be ignored. Using all {:s} extensions instead." .format(f.image, 'image-like' if sciext is None else \ "{:s}".format(utils.ext2str(sciext, default_extver=None)))) # find the reference PHOTFLAM/PHOTNU: flc = fl[:] ref_hdu = None ref_ext = None ref_user = True if ref_phot is None: ref_user = False for f in flc: f.convert2ImageRef() # get primary hdu: pri_hdu = f.image.hdu[0] # find all valid extensions: if ref_phot_ext is None: if sciext == 'PRIMARY': extnum = [0] else: extnum = utils.get_ext_list(f.image, sciext) is_pri_hdu = [f.image.hdu[ext] is pri_hdu for ext in extnum] # if necessary, add primary header to the hdu list: if search_primary: try: pri_index = is_pri_hdu.index(True) extnum.insert(0, extnum.pop(pri_index)) except ValueError: extnum.insert(0, 0) else: extnum = [ref_phot_ext] for ext in extnum: hdu = f.image.hdu[ext] if phot_kwd in hdu.header: ref_phot = hdu.header[phot_kwd] ref_ext = ext ref_hdu = hdu break if ref_phot is None: _mlwarn("WARNING: Could not find specified inverse " " sensitivity keyword '{:s}'\n" " in any of the {} extensions of file '{}'.\n" " This input file will be ignored." .format(phot_kwd, 'image-like' if sciext is None else \ "{:s}".format(utils.ext2str(sciext, default_extver=None)), os.path.basename(f.image.original_fname))) f.release_all_images() fl.remove(f) else: break if ref_phot is None: raise RuntimeError("Could not find the inverse sensitivity keyword " "'{:s}' in the specified headers of " "the input image(s).\nCannot continue." .format(phot_kwd)) aux_phot_kwd_list = ','.join(aux_phot_kwd) _mlinfo("\nPRIMARY PHOTOMETRIC KEYWORD: {:s}".format(phot_kwd)) _mlinfo("SECONDARY PHOTOMETRIC KEYWORD(S): {:s}" .format(aux_phot_kwd_list if aux_phot_kwd_list else 'None')) if ref_user: _mlinfo("REFERENCE VALUE PROVIDED BY USER: '{:s}'={}\n" .format(phot_kwd, ref_phot)) else: _mlinfo("REFERENCE VALUE FROM FILE: '{:s}[{:s}]'\n" .format(os.path.basename(f.image.original_fname), utils.ext2str(ref_ext))) _mlinfo("REFERENCE '{:s}' VALUE IS: {}".format(phot_kwd, ref_phot)) # equalize PHOTFLAM/PHOTNU for f in fl: # open the file if necessary: if f.fnamesOnly: _mlinfo("\nProcessing file '{:s}'".format(f.image)) f.convert2ImageRef() else: _mlinfo("\nProcessing file '{:s}'".format(f.image.original_fname)) # first, see if photflam is in the primary header and save this value: pri_conv = None if search_primary: whdu = f.image.hdu[0] if phot_kwd in whdu.header: _mlinfo(" * Primary header:") if whdu is ref_hdu: pri_conv = 1.0 _mlinfo(" - '{}' = {} found in the primary header." .format(phot_kwd, whdu.header[phot_kwd])) _mlinfo(" - Data conversion factor based on primary " "header: {}".format(pri_conv)) else: _mlinfo(" - '{}' found in the primary header." .format(phot_kwd)) pri_conv = whdu.header[phot_kwd] / ref_phot _mlinfo(" - Setting {:s} in the primary header to {} " "(old value was {})" .format(phot_kwd, ref_phot, whdu.header[phot_kwd])) _mlinfo(" - Data conversion factor based on primary " "header: {}".format(pri_conv)) whdu.header[phot_kwd] = ref_phot # correct the "other" photometric keyword, if present: if pri_conv is not None and whdu is not ref_hdu: for aux_kwd in aux_phot_kwd: if aux_kwd in whdu.header: old_aux_phot = whdu.header[aux_kwd] new_aux_phot = old_aux_phot / pri_conv whdu.header[aux_kwd] = new_aux_phot _mlinfo(" - Setting {:s} in the primary header " "to {} (old value was {})" .format(aux_kwd, new_aux_phot, old_aux_phot)) # process data and error arrays when 'sciext' was specifically set to # 'PRIMARY': if sciext == 'PRIMARY' and pri_conv is not None: has_data = (hasattr(whdu, 'data') and whdu.data is not None) # correct data: if has_data: if np.issubdtype(whdu.data.dtype, np.floating): whdu.data *= pri_conv _mlinfo(" - Data have been multiplied by {}" .format(pri_conv)) else: _mlwarn("WARNING: Data not converted because it is of " "non-floating point type.") # correct error array: if errext is not None: eext = (errext, 1) try: whdu = f.image.hdu[eext] except KeyError: _mlwarn(" - WARNING: Error extension {:s} not found." .format(utils.ext2str(eext))) f.release_all_images() continue if hasattr(whdu, 'data') and whdu.data is not None: if np.issubdtype(whdu.data.dtype, np.floating): whdu.data *= pri_conv _mlinfo(" - Error array (ext={}) has been " "multiplied by {}".format(eext, pri_conv)) else: _mlinfo(" - Error array in extension {:s} " "contains non-floating point data.\n" " Skipping this extension" .format(utils.ext2str(ext))) f.release_all_images() continue # find all valid extensions: extnum = utils.get_ext_list(f.image, sciext) for ext in extnum: whdu = f.image.hdu[ext] conv = None if whdu is ref_hdu: _mlinfo(" * EXT: {} - This is the \"reference\" extension.\n" " Nothing to do. Skipping this extension..." .format(ext)) continue has_data = (hasattr(whdu, 'data') and whdu.data is not None) if has_data and not np.issubdtype(whdu.data.dtype, np.floating): _mlinfo(" * EXT: {} contains non-floating point data. " "Skipping this extension".format(ext)) # find all auxiliary photometric keywords present in the header: paux = [aux_kwd for aux_kwd in aux_phot_kwd if aux_kwd \ in whdu.header] if phot_kwd in whdu.header: _mlinfo(" * EXT: {}".format(ext)) old_phot = whdu.header[phot_kwd] conv = old_phot / ref_phot _mlinfo(" - Setting {:s} to {} (old value was {})" .format(phot_kwd, ref_phot, old_phot)) whdu.header[phot_kwd] = ref_phot _mlinfo(" - Computed conversion factor for data: {}" .format(conv)) elif pri_conv is None: _mlinfo(" * EXT: {}".format(ext)) _mlinfo(" - '{:s} not found. Skipping this extension..." .format(phot_kwd)) continue else: _mlinfo(" * EXT: {}".format(ext)) # if paux: # print("ERROR: Primary photometric keyword ('{:s}') is " # "missing but\n the secondary keywords ('{:s}') " # "are present. This extension cannot be processed." # .format(phot_kwd, ','.join(paux))) # continue _mlinfo(" - '{:s} not found. Using conversion factor " "based\n on the primary header: {}" .format(phot_kwd, pri_conv)) conv = pri_conv # correct the "other" photometric keyword, if present: if conv is not None: for aux_kwd in paux: old_aux_phot = whdu.header[aux_kwd] new_aux_phot = old_aux_phot / conv whdu.header[aux_kwd] = new_aux_phot _mlinfo(" - Setting {:s} to {} (old value was {})" .format(aux_kwd, new_aux_phot, old_aux_phot)) # correct data: if has_data: if conv is None: _mlinfo(" * EXT: {}".format(ext)) if np.issubdtype(whdu.data.dtype, np.floating): whdu.data *= conv _mlinfo(" - Data have been multiplied by {}" .format(conv)) else: _mlinfo("WARNING: Non-floating point data. Data cannot " "be re-scaled.") # correct error array: if errext is not None and isinstance(ext, tuple) and len(ext) == 2: eext = (errext, ext[1]) try: whdu = f.image.hdu[eext] except KeyError: continue if hasattr(whdu, 'data') and whdu.data is not None: if np.issubdtype(whdu.data.dtype, np.floating): whdu.data *= conv _mlinfo(" - Error array (ext={}) has been " "multiplied by {}".format(eext, conv)) else: _mlinfo(" - Error array in extension {:s} " "contains non-floating point data.\n" " Skipping this extension" .format(utils.ext2str(ext))) f.release_all_images() _mlinfo("\nDone.") if readonly: _mlinfo("\nNOTE: '{:s}' was run in READONLY mode\n" " and input image(s)' content WAS NOT MODIFIED." .format(__taskname__)) # close all log file handlers: for h in _log.handlers: if h is not _sh_log and isinstance(h, logging.FileHandler): h.close() _log.removeHandler(h)
#-------------------------- # TEAL Interface functions #-------------------------- def run(configObj): logfile = configObj['logfile'] if len(configObj['logfile'].strip()) > 0 \ else None photeq(files=configObj['files'], sciext=configObj['sciext'], errext=configObj['errext'], ref_phot=configObj['ref_phot'], ref_phot_ext=util.check_blank(configObj['ref_phot_ext']), phot_kwd=configObj['phot_kwd'], aux_phot_kwd=_split_kwd_list(configObj['aux_phot_kwd']), search_primary=configObj['search_primary'], readonly=configObj['readonly'], clobber=configObj['clobber'], logfile=logfile) def _split_kwd_list(kwd_list): kwdl = kwd_list.split(',') newl = [] for kwd in kwdl: k = kwd.strip() if k: newl.append(k) nkwd = len(newl) if nkwd == 0: return None elif nkwd == 1: return newl[0] else: return newl def getHelpAsString(docstring = False, show_ver = True): """ return useful help from a file in the script directory called __taskname__.help """ install_dir = os.path.dirname(__file__) taskname = util.base_taskname(__taskname__, __package__) htmlfile = os.path.join(install_dir, 'htmlhelp', taskname + '.html') helpfile = os.path.join(install_dir, taskname + '.help') if docstring or (not docstring and not os.path.exists(htmlfile)): if show_ver: helpString = os.linesep + \ ' '.join([__taskname__, 'Version', __version__, ' updated on ', __version_date__]) + 2*os.linesep else: helpString = '' if os.path.exists(helpfile): helpString += teal.getHelpFileAsString(taskname, __file__) else: if __doc__ is not None: helpString += __doc__ + os.linesep + photeq.__doc__ else: helpString = 'file://' + htmlfile return helpString
[docs]def help(file=None): """ Print out syntax help for running skymatch Parameters ---------- file : str (Default = None) If given, write out help to the filename specified by this parameter Any previously existing file with this name will be deleted before writing out the help. """ helpstr = getHelpAsString(docstring=True) if file is None: print(helpstr) else: if os.path.exists(file): os.remove(file) f = open(file,mode='w') f.write(helpstr) f.close()