Source code for drizzlepac.wfpc2Data

"""
``wfpc2Data`` module provides classes used to import WFPC2 specific instrument data.

:Authors: Warren Hack, Ivo Busko, Christopher Hanley

:License: :doc:`/LICENSE`

"""
import copy
import os
import shutil
import glob

from astropy.io import fits
import numpy as np

from stsci.tools import fileutil, readgeis

from .imageObject import imageObject
from . import buildmask

# Define default public CRDS server URL to use in case user does not specify one in ``os.environ``
PUBLIC_CRDS_SERVER_URL = "https://hst-crds.stsci.edu"

# Translation table for any image that does not use the DQ extension of the MEF
# for the DQ array.
DQ_EXTNS = {'c0h': 'sdq', 'c0f': 'sci', 'c0m': 'sci'}

#### Calibrated gain and readnoise values for each chip
WFPC2_GAINS = {
    1: {7:[7.12,5.24],15:[13.99,7.02]},
    2: {7:[7.12,5.51],15:[14.50,7.84]},
    3: {7:[6.90,5.22],15:[13.95,6.99]},
    4: {7:[7.10,5.19],15:[13.95,8.32]}
}
WFPC2_DETECTOR_NAMES = {1: "PC", 2: "WF2", 3: "WF3", 4: "WF4"}

[docs] class WFPC2InputImage (imageObject): SEPARATOR = '_' flat_file_map = {} def __init__(self, filename, output=None, group=None): super().__init__(filename, output=output, group=group) # define the cosmic ray bits value to use in the dq array self.cr_bits_value = 4096 self._instrument=self._image["PRIMARY"].header["INSTRUME"] self._effGain = 1 self.errExt = None # Attribute defining the pixel dimensions of WFPC2 chips. self.full_shape = (800, 800) self.native_units = "COUNTS" self.flatkey = 'FLATFILE' # Reference Plate Scale used for updates to MDRIZSKY, we should get this from the wcs class #self.refplatescale = 0.0996 # arcsec / pixel for chip in range(1,self._numchips+1,1): self._assignSignature(chip) #this is used in the static mask self._image[self.scienceExt,chip].cte_dir = -1 # independent of amp, chip det=int(self._image[self.scienceExt,chip].header["DETECTOR"]) self._image[self.scienceExt,chip]._detector=WFPC2_DETECTOR_NAMES[det] self._image[self.scienceExt,chip].darkcurrent = self.getdarkcurrent(chip)
[docs] def find_DQ_extension(self): """ Return the suffix for the data quality extension and the name of the file which that DQ extension should be read from. """ dqfile = None # Look for additional file with DQ array, primarily for WFPC2 data indx = self._filename.find('.fits') _flt_file = False if self._filename.endswith('_flt.fits'): dqfile = self._filename _flt_file = True elif indx > 3: suffix = self._filename[indx-4:indx] dqfile = self._filename.replace(suffix[:3],'_c1') elif indx < 0 and len(self._filename) > 3 and \ self._filename[-4] == os.extsep and \ self._filename[-1].lower() == 'h': # assume we've got a GEIS file dqfile = self._filename[:-2]+'1'+self._filename[-1] hdulist = readgeis.readgeis(dqfile) prih = hdulist[0].header if 'FILETYPE' in prih: dq_suffix = prih['FILETYPE'].strip().upper() else: # assume extension name is 'SDQ' for WFPC2 GEIS files dq_suffix = 'SDQ' hdulist.close() return dqfile,dq_suffix else: raise ValueError("Input file {} does not appear to be neither " \ "a FITS file nor a GEIS file.".format(self._filename)) if os.path.exists(dqfile): if _flt_file: dq_suffix = 'DQ' else: dq_suffix = fits.getval(dqfile, "EXTNAME", ext=1, memmap=False) else: dq_suffix = "SCI" return dqfile, dq_suffix
[docs] def getEffGain(self): """ Method used to return the effective gain of a instrument's detector. Returns ------- gain : float The effective gain. """ return self._effGain
[docs] def setInstrumentParameters(self, instrpars): """ This method overrides the superclass to set default values into the parameter dictionary, in case empty entries are provided. """ pri_header = self._image[0].header self.proc_unit = instrpars['proc_unit'] instrpars['gnkeyword'] = 'ATODGAIN' # hard-code for WFPC2 data instrpars['rnkeyword'] = None if self._isNotValid (instrpars['exptime'], instrpars['expkeyword']): instrpars['expkeyword'] = 'EXPTIME' for chip in self.returnAllChips(extname=self.scienceExt): chip._headergain = self.getInstrParameter( instrpars['gain'], pri_header, instrpars['gnkeyword'] ) chip._exptime = self.getInstrParameter( instrpars['exptime'], pri_header, instrpars['expkeyword'] ) # We need to treat Read Noise as a special case since it is # not populated in the WFPC2 primary header if instrpars['rnkeyword'] is None: chip._rdnoise = None else: chip._rdnoise = self.getInstrParameter( instrpars['rdnoise'], pri_header, instrpars['rnkeyword'] ) if chip._headergain is None or chip._exptime is None: print('ERROR: invalid instrument task parameter') raise ValueError # We need to determine if the user has used the default readnoise/gain value # since if not, they will need to supply a gain/readnoise value as well usingDefaultGain = instrpars['gnkeyword'] == 'ATODGAIN' usingDefaultReadnoise = instrpars['rnkeyword'] in [None, 'None'] # If the user has specified either the readnoise or the gain, we need to make sure # that they have actually specified both values. In the default case, the readnoise # of the system depends on what the gain if usingDefaultReadnoise and usingDefaultGain: self._setchippars() elif usingDefaultReadnoise and not usingDefaultGain: raise ValueError("ERROR: You need to supply readnoise information\n when not using the default gain for WFPC2.") elif not usingDefaultReadnoise and usingDefaultGain: raise ValueError("ERROR: You need to supply gain information when\n not using the default readnoise for WFPC2.") else: # In this case, the user has specified both a gain and readnoise values. Just use them as is. for chip in self.returnAllChips(extname=self.scienceExt): chip._gain = chip._headergain print("Using user defined values for gain and readnoise") # Convert the science data to electrons self.doUnitConversions()
[docs] def getflat(self, chip, flat_file=None, flat_ext=None): """ Method for retrieving a detector's flat field. Parameters ---------- chip : int Chip number. Same as FITS ``EXTVER``. flat_file : str, None Flat field file name. If not specified, it will be determined automatically from image header. flat_ext : str, None Flat field extension name (same as FITS ``EXTNAME``). Specifies extension name containing flat field data. Returns ------- flat : numpy.ndarray The flat-field array in the same shape as the input image. """ # For the WFPC2 flat we need to invert # for use in Multidrizzle if flat_file is None: filename = fileutil.osfn(self._image["PRIMARY"].header[self.flatkey]) if filename in WFPC2InputImage.flat_file_map: flat_file, mef_flat_ext = WFPC2InputImage.flat_file_map[filename] else: h = fileutil.openImage(filename, mode='readonly', memmap=False) flat_file = h.filename() mef_flat_ext = h[0].header.get('FILETYPE', '') mef_flat_ext = h[1].header.get('EXTNAME', mef_flat_ext) h.close() WFPC2InputImage.flat_file_map[filename] = (flat_file, mef_flat_ext) if flat_ext is None: flat_ext = mef_flat_ext elif flat_ext is None: h = fileutil.openImage(flat_file, mode='readonly', memmap=False, writefits=False) flat_ext = h[0].header.get('FILETYPE', '') flat_ext = h[1].header.get('EXTNAME', flat_ext) h.close() flat = 1.0 / super().getflat(chip, flat_file, flat_ext) return flat
[docs] def doUnitConversions(self): """ Apply unit conversions to all the chips, ignoring the group parameter. This insures that all the chips get the same conversions when this gets done, even if only 1 chip was specified to be processed. """ # Image information _handle = fileutil.openImage(self._filename, mode='readonly', memmap=False) # Now convert the SCI array(s) units for det in range(1,self._numchips+1): chip=self._image[self.scienceExt,det] conversionFactor = 1.0 # add D2IMFILE to outputNames for removal by 'clean()' method later if 'D2IMFILE' in _handle[0].header and _handle[0].header['D2IMFILE'] not in ["","N/A"]: chip.outputNames['d2imfile'] = _handle[0].header['D2IMFILE'] if chip._gain is not None: """ # Multiply the values of the sci extension pixels by the gain. print "Converting %s[%d] from COUNTS to ELECTRONS"%(self._filename,det) # If the exptime is 0 the science image will be zeroed out. np.multiply(_handle[self.scienceExt,det].data,chip._gain,_handle[self.scienceExt,det].data) chip.data=_handle[self.scienceExt,det].data # Set the BUNIT keyword to 'electrons' chip._bunit = 'ELECTRONS' chip.header.update('BUNIT','ELECTRONS') _handle[self.scienceExt,det].header.update('BUNIT','ELECTRONS') # Update the PHOTFLAM value photflam = _handle[self.scienceExt,det].header['PHOTFLAM'] _handle[self.scienceExt,det].header.update('PHOTFLAM',(photflam/chip._gain)) """ conversionFactor = chip._gain chip._effGain = chip._gain #1. chip._conversionFactor = conversionFactor #1. else: msg = "Invalid gain value for data, no conversion done" print(msg) raise ValueError(msg) # Close the files and clean-up _handle.close() self._effGain = conversionFactor # 1.
[docs] def getdarkcurrent(self,exten): """ Return the dark current for the WFPC2 detector. This value will be contained within an instrument specific keyword. The value in the image header will be converted to units of electrons. Returns ------- darkcurrent : float Dark current for the WFPC3 detector in **units of counts/electrons**. """ darkrate = 0.005 # electrons / s if self.proc_unit == 'native': darkrate = darkrate / self.getGain(exten) #count/s try: chip = self._image[0] darkcurrent = chip.header['DARKTIME'] * darkrate except: msg = "#############################################\n" msg += "# #\n" msg += "# Error: #\n" msg += "# Cannot find the value for 'DARKTIME' #\n" msg += "# in the image header. WFPC2 input #\n" msg += "# images are expected to have this header #\n" msg += "# keyword. #\n" msg += "# #\n" msg += "# Error occured in the WFPC2InputImage class#\n" msg += "# #\n" msg += "#############################################\n" raise ValueError(msg) return darkcurrent
[docs] def getReadNoise(self, exten): """ Method for returning the readnoise of a detector (in counts). Returns ------- readnoise : float The readnoise of the detector in **units of counts/electrons**. """ rn = self._image[exten]._rdnoise if self.proc_unit == 'native': rn = self._rdnoise / self.getGain(exten) return rn
[docs] def buildMask(self, chip, bits=0, write=False): """ Build masks as specified in the user parameters found in the configObj object. """ sci_chip = self._image[self.scienceExt,chip] ### For WFPC2 Data, build mask files using: maskname = sci_chip.dqrootname+'_dqmask.fits' dqmask_name = buildmask.buildShadowMaskImage(sci_chip.dqfile,sci_chip.detnum,sci_chip.extnum,maskname,bitvalue=bits,binned=sci_chip.binned) sci_chip.dqmaskname = dqmask_name sci_chip.outputNames['dqmask'] = dqmask_name sci_chip.outputNames['tmpmask'] = 'wfpc2_inmask%d.fits'%(sci_chip.detnum) dqmask = fits.getdata(dqmask_name, ext=0, memmap=False) return dqmask
def _assignSignature(self, chip): """assign a unique signature for the image based on the instrument, detector, chip, and size this will be used to uniquely identify the appropriate static mask for the image this also records the filename for the static mask to the outputNames dictionary """ sci_chip = self._image[self.scienceExt,chip] ny = sci_chip._naxis1 nx = sci_chip._naxis2 detnum = sci_chip.detnum sig = (self.outroot, (nx, ny), chip) #signature is a tuple sci_chip.signature=sig #signature is a tuple def _setchippars(self): for chip in self.returnAllChips(extname=self.scienceExt): try: chip._gain,chip._rdnoise = WFPC2_GAINS[chip.detnum][chip._headergain] except KeyError: raise ValueError("! Header gain value is not valid for WFPC2")
# ---------------------------------------------------------------------------- # Functions to convert WFPC2 C0M/C1M files into a single MEF FLT file # # ---------------------------------------------------------------------------- def wfpc2_to_flt(imgname): """Convert separate GEIS-based FITS files into single FLT file Parameters ---------- imgname : str Filename of calibrated WFPC2 SCI (*_c0m.fits) image Returns ------- flt_filename : str Filename of WFPC2 MEF _*flt.fits file that was written out """ is_mef = 'c0m' in imgname if not is_mef: raise TypeError("MEF C0M file needed as input.") dq_file = imgname.replace('c0m', 'c1m') is_dq = os.path.exists(dq_file) flt_filename = imgname.replace('c0m', 'flt') # Read in input SCI file in_sci = fits.open(imgname) # Add keywords to be more compatible with ACS and WFC3 data num_sci = fileutil.countExtn(imgname) det_name = 'PC' in_sci[0].header['DETECTOR'] = det_name in_sci[0].header['PRIMESI'] = det_name if is_dq: # Read in existing input DQ file in_dq = fits.open(dq_file) dq_extns = [extn for extn in in_dq[1:]] else: # Could not find a DQ file, so create empty DQ arrays # based on SCI arrays dq_extns = [extn for extn in copy.deepcopy(in_sci[1:])] for extn in dq_extns: extn.data = np.zeros(extn.data.shape, dtype=np.int32) # Update EXTNAME to be consistent with ACS and WFC3 DQ extname for i,extn in enumerate(dq_extns): extn.header['extname'] = 'DQ' extn.header['extver'] = i+1 # Now create ERR arrays as well... err_extns =[extn for extn in copy.deepcopy(in_sci[1:])] for i, extn in enumerate(err_extns): # Initialize using Poisson error estimate extn.data = np.sqrt(extn.data) extn.header['extname'] = 'ERR' extn.header['extver'] = i+1 # Create output FLT file now to avoid having astropy # create a tmp* file that doesn't always get cleaned up... out_hdu = copy.deepcopy(in_sci) fname_kw = out_hdu[0].header['filename'] out_hdu[0].header['filename'] = f"{fname_kw[:-8]}flt.fits" for dq_extn, err_extn in zip(dq_extns, err_extns): out_hdu.append(dq_extn) out_hdu.append(err_extn) print(f"Writing out {flt_filename}") out_file = open(flt_filename, 'wb') out_hdu.writeto(out_file, overwrite=True) in_sci.close() del in_sci if is_dq: in_dq.close() return flt_filename # ------------------------------------------------------ # Function for updating headers to latest # reference files from CRDS # ------------------------------------------------------ def apply_bestrefs(filename=None, dirname=None, uref_path=None, crds_path=None, reftypes=['idctab', 'dgeofile', 'offtab']): """Update WFPC2 data to use the latest reference files from CRDS .. note:: See `https://hst-crds.stsci.edu/docs/cmdline_bestrefs/ <https://hst-crds.stsci.edu/docs/cmdline_bestrefs/>`_ for details on how to configure CRDS for your local system and for definitions of all the environment variables used by CRDS. Parameters ---------- filename : str, optional Filename of input file to be updated. If not specified, **all RAW and calibrated files** from the current directory, or ``dirname`` directory if given, will be updated. dirname : str, optional Name of directory containing WFPC2 data to be updated. If not specified, current directory will be checked. uref_path : str, optional Path for ``uref`` directory on local system. If not provided, the one defined in `os.environ` will be used. crds_path : str Path for the ``CRDS_PATH`` directory on local system. .. warning:: If no value is specified and ``CRDS_PATH`` is not already defined locally, this function will create a temporary ``CRDS_PATH`` directory tree under the directory with the files to be processed, populate it with the latest reference files and mappings needed by CRDS, then delete it when done. reftypes : list, optional List of reference files to be updated. If None or an empty list, all reference files will be updated. """ if reftypes is None: reftypes = [] starting_dir = os.getcwd() wfpc2_dir = dirname if dirname else starting_dir os.chdir(wfpc2_dir) c0m = [] if filename: if os.path.exists(filename): # User specified a single filename to process (default case) d0m = [filename] elif '*' in filename: # User specified a wild-carded filename to use as input d0m = sorted(glob.glob(filename)) else: # Single input filename provided that could not be found os.chdir(starting_dir) raise ValueError(f"WFPC2 image {filename} not found in {wfpc2_dir}") else: # Get the list of ALL WFPC2 images from the specified directory c0m = sorted(glob.glob("*c0m.fits")) d0m = sorted(glob.glob("*d0m.fits")) # Make sure we are in a directory with WFPC2 images if len(d0m) == 0: # Return to original starting directory os.chdir(starting_dir) raise ValueError(f"ERROR: No WFPC2 data in {wfpc2_dir}") orig_crds = {'CRDS_PATH': os.environ.get('CRDS_PATH'), 'uref': os.environ.get('uref'), 'CRDS_SERVER_URL': os.environ.get('CRDS_SERVER_URL'), 'CRDS_OBSERVATORY': os.environ.get('CRDS_OBSERVATORY')} # Now, define what CRDS directories will be used for this update... remove_local_cache = False sync_refs = False if os.environ.get('CRDS_READONLY_CACHE') == '1' else True if not orig_crds['CRDS_PATH']: # User has not set up any local CRDS cache, so # we need to define one under the current working directory crds_cache = os.path.join(wfpc2_dir, 'crds_cache') crds_hst_cache = os.path.join(crds_cache, 'references', 'hst') crds_map_cache = os.path.join(crds_cache, 'mappings', 'hst') crds_uref_path = uref_path if uref_path else os.path.join(crds_hst_cache, 'wfpc2', os.sep) if not os.path.exists(crds_hst_cache): os.makedirs(crds_hst_cache) print(f"Creating temporary CRDS cache directory: {crds_hst_cache}") if not os.path.exists(crds_map_cache): os.makedirs(crds_map_cache) print(f"Creating temporary CRDS MAP cache directory: {crds_map_cache}") remove_local_cache = True else: # User has already defined a local CRDS cache, *and* it already exists, so use it. if crds_path: crds_cache = crds_path else: crds_cache = orig_crds['CRDS_PATH'] if os.path.exists(orig_crds['CRDS_PATH']) else None crds_hst_cache = os.path.join(crds_cache, 'references', 'hst') if os.path.exists(orig_crds['uref']): # Use the `uref` defined by the user in their `os.environ` crds_uref_path = orig_crds['uref'] else: # User wants to explicitly use a CRDS cache they specify on input # and default to a path based on CRDS_PATH if `uref_path` is not # actually passed in through this function crds_uref_path = uref_path if uref_path else os.path.join(crds_hst_cache, 'wfpc2', os.sep) # `orig_crds["CRDS_PATH"]` has been defined, now verify it is valid... if not os.path.exists(crds_cache): # If CRDS_PATH was defined, but does not exists, raise an Exception # so that the user can finish setting up their CRDS cache properly. os.chdir(starting_dir) raise EnvironmentError(f"CRDS_PATH was defined as {crds_cache}, but does not exist!") if not os.path.exists(crds_uref_path): # If CRDS_PATH was defined, but does not exists, raise an Exception # so that the user can finish setting up their CRDS cache properly. os.chdir(starting_dir) raise EnvironmentError(f"CRDS path to `uref` was defined as {crds_uref_path}, but does not exist!") # Now that we have confirmed we have images to update... # configure CRDS for use in updating the WFPC2 data os.environ['CRDS_SERVER_URL'] = orig_crds['CRDS_SERVER_URL'] if orig_crds['CRDS_SERVER_URL'] else PUBLIC_CRDS_SERVER_URL os.environ['CRDS_OBSERVATORY'] = "hst" os.environ['CRDS_PATH'] = crds_cache os.environ['uref'] = crds_uref_path # os.environ['CRDS_PATH'] = "D:\data\crds_cache" # os.environ['uref'] = "D:\\data\\crds_cache\\references\\hst\\wfpc2\\" # Only import this package if there is data to be updated import crds print(f"Running CRDS.assign_bestrefs on: {d0m} for reftypes={reftypes}") # Apply bestrefs to images after downloading references to local CRDS cache crds.assign_bestrefs(d0m, reftypes=reftypes, sync_references=sync_refs) if len(c0m) > 0: crds.assign_bestrefs(c0m, reftypes=reftypes, sync_references=sync_refs) # clean up temp crds_cache dir, if created if remove_local_cache: shutil.rmtree(crds_cache) # Now, revert os.environ to original state prior to running this function for crds_key, crds_val in orig_crds.items(): if crds_val is None: # If it was originally None, then # there was no key defined originally, so delete it. del os.environ[crds_key] else: os.environ[crds_key] = crds_val # Return to original starting directory os.chdir(starting_dir)