Source code for drizzlepac.imageObject

"""
A class which makes image objects for each input filename.

:Authors: Warren Hack

:License: :doc:`LICENSE`

"""
import copy, os, re, sys

import numpy as np
from stwcs import distortion

from stsci.tools import fileutil, logutil, textutil
from astropy.io import fits
from . import util
from . import wcs_functions
from . import buildmask
from .version import *

__all__ = ['baseImageObject', 'imageObject', 'WCSObject']


log = logutil.create_logger(__name__, level=logutil.logging.NOTSET)


_NUMPY_TO_IRAF_DTYPES = {'float64': -64, 'float32': -32, 'uint8': 8,
                         'int16': 16, 'int32': 32, 'int64': 64}

_IRAF_DTYPES_TO_NUMPY = {-64: 'float64', -32: 'float32', 8: 'uint8',
                         16: 'int16', 32: 'int32', 64: 'int64'}


[docs]class baseImageObject: """ Base ImageObject which defines the primary set of methods. """ def __init__(self,filename): """ """ self.scienceExt= "SCI" # the extension the science image is stored in self.maskExt="DQ" #the extension with the mask image in it self.errExt = "ERR" # the extension the ERR array can be found in self._filename = filename self._original_file_name = filename self.native_units='ELECTRONS' self.flatkey = None # keyword which points to flat-field reference file self._image = None self._instrument=None self._rootname=None self.outputNames={} self.outputValues = {} self.createContext = True self.inmemory = False # flag for all in-memory operations #this is the number of science chips to be processed in the file self._numchips=1 self._nextend=0 # this is the number of chip which will be combined based on 'group' parameter self._nmembers = 0 def __getitem__(self,exten): """ Overload getitem to return the data and header these only work on the HDU list already in memory once the data has been zero's in self._image you should use getData or getHeader to re-read the file. """ return fileutil.getExtn(self._image,extn=exten) def __cmp__(self, other): """ Overload the comparison operator just to check the filename of the object? """ return (isinstance(other, imageObject) and self._filename == other._filename) def _isNotValid(self, par1, par2): """ Method used to determine if a value or keyword is supplied as input for instrument specific parameters. """ invalidValues = [None,'None','INDEF',''] return (par1 in invalidValues and par2 in invalidValues)
[docs] def info(self): """ Return fits information on the _image. """ #if the file hasn't been closed yet then we can #use the fits info which looks at the extensions #if(self._isSimpleFits): # print self._filename," is a simple fits image" #else: self._image.info()
[docs] def close(self): """ Close the object nicely and release all the data arrays from memory YOU CANT GET IT BACK, the pointers and data are gone so use the getData method to get the data array returned for future use. You can use putData to reattach a new data array to the imageObject. """ if self._image is None: return # mcara: I think the code below is not necessary but in order to # preserve the same functionality as the code removed below, # I make an empty copy of the image object: empty_image = fits.HDUList() for u in self._image: empty_image.append(u.__class__(data=None, header=None)) # mcara: END unnecessary code self._image.close() #calls fits.close() self._image = empty_image
#we actuallly want to make sure that all the #data extensions have been closed and deleted #since we could have the DQ,ERR and others read in #at this point, but I'd like there to be something #valid there afterwards that I can play with # mcara: REMOVED unnecessary code: # # if not self._isSimpleFits: # for ext,hdu in enumerate(self._image): # #use the datatype for the extension # #dtype=self.getNumpyType(hdu.header["BITPIX"]) # hdu.data = None #np.array(0,dtype=dtype) #so we dont get io errors on stuff that wasn't read in yet # else: # self._image.data= None # np.array(0,dtype=self.getNumpyType(self._image.header["BITPIX"]))
[docs] def clean(self): """ Deletes intermediate products generated for this imageObject. """ clean_files = ['blotImage','crmaskImage','finalMask', 'staticMask','singleDrizMask','outSky', 'outSContext','outSWeight','outSingle', 'outMedian','dqmask','tmpmask', 'skyMatchMask'] log.info('Removing intermediate files for %s' % self._filename) # We need to remove the combined products first; namely, median image util.removeFileSafely(self.outputNames['outMedian']) # Now remove chip-specific intermediate files, if any were created. for chip in self.returnAllChips(extname='SCI'): for fname in clean_files: if fname in chip.outputNames: util.removeFileSafely(chip.outputNames[fname])
[docs] def getData(self,exten=None): """ Return just the data array from the specified extension fileutil is used instead of fits to account for non- FITS input images. openImage returns a fits object. """ if exten.lower().find('sci') > -1: # For SCI extensions, the current file will have the data fname = self._filename else: # otherwise, the data being requested may need to come from a # separate file, as is the case with WFPC2 DQ data. # # convert exten to 'sci',extver to get the DQ info for that chip extn = exten.split(',') sci_chip = self._image[self.scienceExt,int(extn[1])] fname = sci_chip.dqfile extnum = self._interpretExten(exten) if self._image[extnum].data is None: if os.path.exists(fname): _image=fileutil.openImage(fname, clobber=False, memmap=False) _data=fileutil.getExtn(_image, extn=exten).data _image.close() del _image self._image[extnum].data = _data else: _data = None else: _data = self._image[extnum].data return _data
[docs] def getHeader(self,exten=None): """ Return just the specified header extension fileutil is used instead of fits to account for non-FITS input images. openImage returns a fits object. """ _image=fileutil.openImage(self._filename, clobber=False, memmap=False) _header=fileutil.getExtn(_image,extn=exten).header _image.close() del _image return _header
def _interpretExten(self,exten): #check if the exten is a string or number and translate to the correct chip _extnum=0 if ',' in str(exten): #assume a string like "sci,1" has been given _extensplit=exten.split(',') _extname=_extensplit[0] _extver=int(_extensplit[1]) _extnum=self.findExtNum(_extname,_extver) else: #assume that a direct extnum has been given _extnum=int(exten) if _extnum is None: msg = "no extension number found" log.error(msg) raise ValueError(msg) return _extnum
[docs] def updateData(self,exten,data): """ Write out updated data and header to the original input file for this object. """ _extnum=self._interpretExten(exten) fimg = fileutil.openImage(self._filename, mode='update', memmap=False) fimg[_extnum].data = data fimg[_extnum].header = self._image[_extnum].header fimg.close()
[docs] def putData(self,data=None,exten=None): """ Now that we are removing the data from the object to save memory, we need something that cleanly puts the data array back into the object so that we can write out everything together using something like fits.writeto....this method is an attempt to make sure that when you add an array back to the .data section of the hdu it still matches the header information for that section ( ie. update the bitpix to reflect the datatype of the array you are adding). The other header stuff is up to you to verify. Data should be the data array exten is where you want to stick it, either extension number or a string like 'sci,1' """ if data is None: log.warning("No data supplied") else: extnum = _interpretExten(exten) ext = self._image[extnum] # update the bitpix to the current datatype, this aint fancy and # ignores bscale ext.header['BITPIX'] = _NUMPY_TO_IRAF_DTYPES[data.dtype.name] ext.data = data
[docs] def getAllData(self,extname=None,exclude=None): """ This function is meant to make it easier to attach ALL the data extensions of the image object so that we can write out copies of the original image nicer. If no extname is given, the it retrieves all data from the original file and attaches it. Otherwise, give the name of the extensions you want and all of those will be restored. Ok, I added another option. If you want to get all the data extensions EXCEPT a particular one, leave extname=NONE and set exclude=EXTNAME. This is helpfull cause you might not know all the extnames the image has, this will find out and exclude the one you do not want overwritten. """ extensions = self._findExtnames(extname=extname,exclude=exclude) for i in range(1,self._nextend+1,1): if hasattr(self._image[i],'_extension') and \ "IMAGE" in self._image[i]._extension: extver = self._image[i].header['extver'] if (self._image[i].extname in extensions) and self._image[self.scienceExt,extver].group_member: self._image[i].data=self.getData(self._image[i].extname + ','+str(self._image[i].extver))
[docs] def returnAllChips(self,extname=None,exclude=None): """ Returns a list containing all the chips which match the extname given minus those specified for exclusion (if any). """ extensions = self._findExtnames(extname=extname,exclude=exclude) chiplist = [] for i in range(1,self._nextend+1,1): if 'extver' in self._image[i].header: extver = self._image[i].header['extver'] else: extver = 1 if hasattr(self._image[i],'_extension') and \ "IMAGE" in self._image[i]._extension: if (self._image[i].extname in extensions) and self._image[self.scienceExt,extver].group_member: chiplist.append(self._image[i]) return chiplist
def _findExtnames(self, extname=None, exclude=None): """ This method builds a list of all extensions which have 'EXTNAME'==extname and do not include any extensions with 'EXTNAME'==exclude, if any are specified for exclusion at all. """ #make a list of the available extension names for the object extensions=[] if extname is not None: if not isinstance(extname,list): extname=[extname] for extn in extname: extensions.append(extn.upper()) else: #restore all the extensions data from the original file, be careful here #if you've altered data in memory you want to keep! for i in range(1,self._nextend+1,1): if hasattr(self._image[i],'_extension') and \ "IMAGE" in self._image[i]._extension: if self._image[i].extname.upper() not in extensions: extensions.append(self._image[i].extname) #remove this extension from the list if exclude is not None: exclude.upper() if exclude in extensions: newExt=[] for item in extensions: if item != exclude: newExt.append(item) extensions=newExt del newExt return extensions
[docs] def findExtNum(self, extname=None, extver=1): """Find the extension number of the give extname and extver.""" extnum = None extname = extname.upper() if not self._isSimpleFits: for ext in self._image: if (hasattr(ext,'_extension') and 'IMAGE' in ext._extension and (ext.extname == extname) and (ext.extver == extver)): extnum = ext.extnum else: log.info("Image is simple fits") return extnum
def _assignRootname(self, chip): """ Assign a unique rootname for the image based in the expname. """ extname=self._image[self.scienceExt,chip].header["EXTNAME"].lower() extver=self._image[self.scienceExt,chip].header["EXTVER"] expname = self._rootname # record extension-based name to reflect what extension a mask file corresponds to self._image[self.scienceExt,chip].rootname=expname + "_" + extname + str(extver) self._image[self.scienceExt,chip].sciname=self._filename + "[" + extname +","+str(extver)+"]" self._image[self.scienceExt,chip].dqrootname=self._rootname + "_" + extname + str(extver) # Needed to keep EXPNAMEs associated properly (1 EXPNAME for all chips) self._image[self.scienceExt,chip]._expname=expname self._image[self.scienceExt,chip]._chip =chip def _setOutputNames(self,rootname,suffix='_drz'): """ Define the default output filenames for drizzle products, these are based on the original rootname of the image filename should be just 1 filename, so call this in a loop for chip names contained inside a file. """ # Define FITS output filenames for intermediate products # Build names based on final DRIZZLE output name # where 'output' normally would have been created # by 'process_input()' # outFinal = rootname+suffix+'.fits' outSci = rootname+suffix+'_sci.fits' outWeight = rootname+suffix+'_wht.fits' outContext = rootname+suffix+'_ctx.fits' outMedian = rootname+'_med.fits' # Build names based on input name origFilename = self._filename.replace('.fits','_OrIg.fits') outSky = rootname + '_sky.fits' outSingle = rootname+'_single_sci.fits' outSWeight = rootname+'_single_wht.fits' crCorImage = rootname+'_crclean.fits' # Build outputNames dictionary fnames={ 'origFilename': origFilename, 'outFinal': outFinal, 'outMedian': outMedian, 'outSci': outSci, 'outWeight': outWeight, 'outContext': outContext, 'outSingle': outSingle, 'outSWeight': outSWeight, 'outSContext': None, 'outSky': outSky, 'crcorImage': crCorImage, 'ivmFile': None } return fnames def _setChipOutputNames(self,rootname,chip): blotImage = rootname + '_blt.fits' crmaskImage = rootname + '_crmask.fits' # Start with global names fnames = self.outputNames # Now add chip-specific entries fnames['blotImage'] = blotImage fnames['crmaskImage'] = crmaskImage sci_chip = self._image[self.scienceExt,chip] # Define mask names as additional entries into outputNames dictionary fnames['finalMask']=sci_chip.dqrootname+'_final_mask.fits' # used by final_drizzle fnames['singleDrizMask']=fnames['finalMask'].replace('final','single') fnames['staticMask']=None # Add the following entries for use in creating outputImage object fnames['data'] = sci_chip.sciname return fnames ############################################################## # # Methods related to managing virtual intermediate output products # as opposed to writing them out as files on disk # ############################################################## def _initVirtualOutputs(self): """ Sets up the structure to hold all the output data arrays for this image in memory. """ self.virtualOutputs = {} for product in self.outputNames: self.virtualOutputs[product] = None
[docs] def saveVirtualOutputs(self,outdict): """ Assign in-memory versions of generated products for this ``imageObject`` based on dictionary 'outdict'. """ if not self.inmemory: return for outname in outdict: self.virtualOutputs[outname] = outdict[outname]
[docs] def getOutputName(self,name): """ Return the name of the file or PyFITS object associated with that name, depending on the setting of self.inmemory. """ val = self.outputNames[name] if self.inmemory: # if inmemory was turned on... # return virtualOutput object saved with that name val = self.virtualOutputs[val] return val
############################################################## # Methods for managing output values associated with this input ##############################################################
[docs] def updateOutputValues(self,output_wcs): """ Copy info from output WCSObject into outputnames for each chip for use in creating outputimage object. """ outputvals = self.outputValues outputvals['output'] = output_wcs.outputNames['outFinal'] outputvals['outnx'], outputvals['outny'] = output_wcs.wcs.pixel_shape outputvals['texptime'] = output_wcs._exptime outputvals['texpstart'] = output_wcs._expstart outputvals['texpend'] = output_wcs._expend outputvals['nimages'] = output_wcs.nimages outputvals['scale'] = output_wcs.wcs.pscale #/ self._image[self.scienceExt,1].wcs.pscale outputvals['exptime'] = self._exptime outnames = self.outputNames outnames['outMedian'] = output_wcs.outputNames['outMedian'] outnames['outFinal'] = output_wcs.outputNames['outFinal'] outnames['outSci'] = output_wcs.outputNames['outSci'] outnames['outWeight'] = output_wcs.outputNames['outWeight'] outnames['outContext'] = output_wcs.outputNames['outContext']
[docs] def updateContextImage(self, contextpar): """ Reset the name of the context image to `None` if parameter ``context`` is `False`. """ self.createContext = contextpar if not contextpar: log.info('No context image will be created for %s' % self._filename) self.outputNames['outContext'] = None
[docs] def find_DQ_extension(self): """ Return the suffix for the data quality extension and the name of the file which that DQ extension should be read from. """ dqfile = None dq_suffix=None if(self.maskExt is not None): for hdu in self._image: # Look for DQ extension in input file if 'extname' in hdu.header and hdu.header['extname'].lower() == self.maskExt.lower(): dqfile = self._filename dq_suffix=self.maskExt break return dqfile,dq_suffix
[docs] def getKeywordList(self, kw): """ Return lists of all attribute values for all active chips in the ``imageObject``. """ kwlist = [] for chip in range(1,self._numchips+1,1): sci_chip = self._image[self.scienceExt,chip] if sci_chip.group_member: kwlist.append(sci_chip.__dict__[kw]) return kwlist
[docs] def getGain(self, exten): return self._image[exten]._gain
[docs] def getflat(self, chip): """ Method for retrieving a detector's flat field. Returns ------- flat: array This method will return an array the same shape as the image in **units of electrons**. """ sci_chip = self._image[self.scienceExt, chip] # The keyword for ACS flat fields in the primary header of the flt # file is pfltfile. This flat file is already in the required # units of electrons. # The use of fileutil.osfn interprets any environment variable, such as # jref$, used in the specification of the reference filename filename = fileutil.osfn(self._image["PRIMARY"].header[self.flatkey]) hdulist = None try: hdulist = fileutil.openImage(filename, mode='readonly', memmap=False) data = hdulist[(self.scienceExt, chip)].data if data.shape[0] != sci_chip.image_shape[0]: ltv2 = int(np.round(sci_chip.ltv2)) else: ltv2 = 0 size2 = sci_chip.image_shape[0] + ltv2 if data.shape[1] != sci_chip.image_shape[1]: ltv1 = int(np.round(sci_chip.ltv1)) else: ltv1 = 0 size1 = sci_chip.image_shape[1] + ltv1 flat = data[ltv2:size2, ltv1:size1] except FileNotFoundError: flat = np.ones(sci_chip.image_shape, dtype=sci_chip.image_dtype) log.warning("Cannot find flat field file '{}'".format(filename)) log.warning("Treating flatfield as a constant value of '1'.") finally: if hdulist is not None: hdulist.close() return flat
[docs] def getReadNoiseImage(self, chip): """ Notes ===== Method for returning the readnoise image of a detector (in electrons). The method will return an array of the same shape as the image. :units: electrons """ sci_chip = self._image[self.scienceExt,chip] return np.ones(sci_chip.image_shape,dtype=sci_chip.image_dtype) * sci_chip._rdnoise
[docs] def getexptimeimg(self,chip): """ Notes ===== Return an array representing the exposure time per pixel for the detector. This method will be overloaded for IR detectors which have their own EXP arrays, namely, WFC3/IR and NICMOS images. :units: None Returns ======= exptimeimg : numpy array The method will return an array of the same shape as the image. """ sci_chip = self._image[self.scienceExt,chip] if sci_chip._wtscl_par == 'expsq': wtscl = sci_chip._exptime*sci_chip._exptime else: wtscl = sci_chip._exptime return np.ones(sci_chip.image_shape,dtype=sci_chip.image_dtype)*wtscl
[docs] def getdarkimg(self,chip): """ Notes ===== Return an array representing the dark image for the detector. The method will return an array of the same shape as the image. :units: electrons """ sci_chip = self._image[self.scienceExt,chip] return np.ones(sci_chip.image_shape,dtype=sci_chip.image_dtype)*sci_chip.darkcurrent
[docs] def getskyimg(self,chip): """ Notes ===== Return an array representing the sky image for the detector. The value of the sky is what would actually be subtracted from the exposure by the skysub step. :units: electrons """ sci_chip = self._image[self.scienceExt,chip] return np.ones(sci_chip.image_shape,dtype=sci_chip.image_dtype)*sci_chip.subtractedSky
[docs] def getdarkcurrent(self): """ Notes ===== Return the dark current for the detector. This value will be contained within an instrument specific keyword. The value in the image header will be converted to units of electrons. :units: electrons """ pass
# the following two functions are basically doing the same thing, # how are they used differently in the code?
[docs] def getExtensions(self, extname='SCI', section=None): """ Return the list of EXTVER values for extensions with name specified in extname. """ if section is None: numext = 0 section = [] for hdu in self._image: if 'extname' in hdu.header and hdu.header['extname'] == extname: section.append(hdu.header['extver']) else: if not isinstance(section,list): section = [section] return section
def _countEXT(self,extname="SCI"): """ Count the number of extensions in the file with the given name (``EXTNAME``). """ count=0 #simple fits image if (self._image['PRIMARY'].header["EXTEND"]): for i,hdu in enumerate(self._image): if i > 0: hduExtname = False if 'EXTNAME' in hdu.header: self._image[i].extnum=i self._image[i].extname=hdu.header["EXTNAME"] hduExtname = True if 'EXTVER' in hdu.header: self._image[i].extver=hdu.header["EXTVER"] else: self._image[i].extver = 1 if ((extname is not None) and \ (hduExtname and (hdu.header["EXTNAME"] == extname))) \ or extname is None: count=count+1 return count
[docs] def getNumpyType(self, irafType): """ Return the corresponding numpy data type. """ return _IRAF_DTYPES_TO_NUMPY[irafType]
[docs] def buildMask(self,chip,bits=0,write=False): """ Build masks as specified in the user parameters found in the configObj object. We should overload this function in the instrument specific implementations so that we can add other stuff to the badpixel mask? Like vignetting areas and chip boundries in nicmos which are camera dependent? these are not defined in the DQ masks, but should be masked out to get the best results in multidrizzle. """ dqarr = self.getData(exten=self.maskExt+','+str(chip)) dqmask = buildmask.buildMask(dqarr,bits) if write: phdu = fits.PrimaryHDU(data=dqmask,header=self._image[self.maskExt,chip].header) dqmask_name = self._image[self.scienceExt,chip].dqrootname+'_dqmask.fits' log.info('Writing out DQ/weight mask: %s' % dqmask_name) if os.path.exists(dqmask_name): os.remove(dqmask_name) phdu.writeto(dqmask_name) del phdu self._image[self.scienceExt,chip].dqmaskname = dqmask_name # record the name of this mask file that was created for later # removal by the 'clean()' method self._image[self.scienceExt,chip].outputNames['dqmask'] = dqmask_name del dqarr return dqmask
[docs] def buildEXPmask(self, chip, dqarr): """ Builds a weight mask from an input DQ array and the exposure time per pixel for this chip. """ log.info("Applying EXPTIME weighting to DQ mask for chip %s" % chip) #exparr = self.getexptimeimg(chip) exparr = self._image[self.scienceExt,chip]._exptime expmask = exparr*dqarr return expmask.astype(np.float32)
[docs] def buildIVMmask(self ,chip, dqarr, scale): """ Builds a weight mask from an input DQ array and either an IVM array provided by the user or a self-generated IVM array derived from the flat-field reference file associated with the input image. """ sci_chip = self._image[self.scienceExt,chip] ivmname = self.outputNames['ivmFile'] if ivmname is not None: log.info("Applying user supplied IVM files for chip %s" % chip) #Parse the input file name to get the extension we are working on extn = "IVM,{}".format(chip) #Open the mask image for updating and the IVM image ivm = fileutil.openImage(ivmname, mode='readonly', memmap=False) ivmfile = fileutil.getExtn(ivm, extn) # Multiply the IVM file by the input mask in place. ivmarr = ivmfile.data * dqarr ivm.close() else: log.info("Automatically creating IVM files for chip %s" % chip) # If no IVM files were provided by the user we will # need to automatically generate them based upon # instrument specific information. flat = self.getflat(chip) RN = self.getReadNoiseImage(chip) darkimg = self.getdarkimg(chip) skyimg = self.getskyimg(chip) #exptime = self.getexptimeimg(chip) #exptime = sci_chip._exptime #ivm = (flat*exptime)**2/(darkimg+(skyimg*flat)+RN**2) ivm = (flat)**2/(darkimg+(skyimg*flat)+RN**2) # Multiply the IVM file by the input mask in place. ivmarr = ivm * dqarr # Update 'wt_scl' parameter to match use of IVM file sci_chip._wtscl = pow(sci_chip._exptime,2)/pow(scale,4) #sci_chip._wtscl = 1.0/pow(scale,4) return ivmarr.astype(np.float32)
[docs] def buildERRmask(self,chip,dqarr,scale): """ Builds a weight mask from an input DQ array and an ERR array associated with the input image. """ sci_chip = self._image[self.scienceExt,chip] # Set default value in case of error, or lack of ERR array errmask = dqarr if self.errExt is not None: try: # Attempt to open the ERR image. err = self.getData(exten=self.errExt+','+str(chip)) log.info("Applying ERR weighting to DQ mask for chip %s" % chip) # Multiply the scaled ERR file by the input mask in place. #exptime = self.getexptimeimg(chip) exptime = sci_chip._exptime errmask = (exptime/err)**2 * dqarr # Update 'wt_scl' parameter to match use of IVM file #sci_chip._wtscl = pow(sci_chip._exptime,2)/pow(scale,4) sci_chip._wtscl = 1.0/pow(scale,4) del err except: # We cannot find an 'ERR' extension and the data isn't WFPC2. # Print a generic warning message and continue on with the # final drizzle step. print(textutil.textbox( 'WARNING: No ERR weighting will be applied to the mask ' 'used in the final drizzle step! Weighting will be only ' 'by exposure time.\n\nThe data provided as input does not ' 'contain an ERR extension'), file=sys.stderr) print('\n Continue with final drizzle step...', sys.stderr) else: # If we were unable to find an 'ERR' extension to apply, one # possible reason was that the input was a 'standard' WFPC2 data # file that does not actually contain an error array. Test for # this condition and issue a Warning to the user and continue on to # the final drizzle. print(textutil.textbox( "WARNING: No ERR weighting will be applied to the mask used " "in the final drizzle step! Weighting will be only by " "exposure time.\n\nThe WFPC2 data provided as input does not " "contain ERR arrays. WFPC2 data is not supported by this " "weighting type.\n\nA workaround would be to create inverse " "variance maps and use 'IVM' as the final_wht_type. See the " "HELP file for more details on using inverse variance maps."), file=sys.stderr) print("\n Continue with final drizzle step...", file=sys.stderr) return errmask.astype(np.float32)
[docs] def updateIVMName(self,ivmname): """ Update outputNames for image with user-supplied IVM filename. """ self.outputNames['ivmFile'] = ivmname
[docs] def set_mt_wcs(self, image): """ Reset the WCS for this image based on the WCS information from another imageObject. """ for chip in range(1,self._numchips+1,1): sci_chip = self._image[self.scienceExt,chip] ref_chip = image._image[image.scienceExt,chip] # Do we want to keep track of original WCS or not? No reason now... sci_chip.wcs = ref_chip.wcs.copy()
[docs] def set_wtscl(self, chip, wtscl_par): """ Sets the value of the wt_scl parameter as needed for drizzling. """ sci_chip = self._image[self.scienceExt,chip] exptime = 1 #sci_chip._exptime _parval = 'unity' if wtscl_par is not None: if type(wtscl_par) == type(''): if not wtscl_par.isdigit(): # String passed in as value, check for 'exptime' or 'expsq' _wtscl_float = None try: _wtscl_float = float(wtscl_par) except ValueError: _wtscl_float = None if _wtscl_float is not None: _wtscl = _wtscl_float elif wtscl_par == 'expsq': _wtscl = exptime*exptime _parval = 'expsq' else: # Default to the case of 'exptime', if # not explicitly specified as 'expsq' _wtscl = exptime else: # int value passed in as a string, convert to float _wtscl = float(wtscl_par) else: # We have a non-string value passed in... _wtscl = float(wtscl_par) else: # Default case: wt_scl = exptime _wtscl = exptime sci_chip._wtscl_par = _parval sci_chip._wtscl = _wtscl
[docs] def set_units(self): """ Record the units for this image, both BUNITS from header and in_units as needed internally. This method will be defined specifically for each instrument. """ pass
[docs] def getInstrParameter(self, value, header, keyword): """ This method gets a instrument parameter from a pair of task parameters: a value, and a header keyword. The default behavior is: - if the value and header keyword are given, raise an exception. - if the value is given, use it. - if the value is blank and the header keyword is given, use the header keyword. - if both are blank, or if the header keyword is not found, return None. """ if isinstance(value, str) and value in ['None', '', ' ', 'INDEF']: value = None if value and (keyword is not None and keyword.strip() != ''): exceptionMessage = "ERROR: Your input is ambiguous! Please specify either a value or a keyword.\n You specifed both " + str(value) + " and " + str(keyword) raise ValueError(exceptionMessage) elif value is not None and value != '': return self._averageFromList(value) elif keyword is not None and keyword.strip() != '': return self._averageFromHeader(header, keyword) else: return None
def _averageFromHeader(self, header, keyword): """ Averages out values taken from header. The keywords where to read values from are passed as a comma-separated list. """ _list = '' for _kw in keyword.split(','): if _kw in header: _list = _list + ',' + str(header[_kw]) else: return None return self._averageFromList(_list) def _averageFromList(self, param): """ Averages out values passed as a comma-separated list, disregarding the zero-valued entries. """ _result = 0.0 _count = 0 for _param in param.split(','): if _param != '' and float(_param) != 0.0: _result = _result + float(_param) _count += 1 if _count >= 1: _result = _result / _count return _result
[docs]class imageObject(baseImageObject): """ This returns an imageObject that contains all the necessary information to run the image file through any multidrizzle function. It is essentially a PyFits object with extra attributes. There will be generic keywords which are good for the entire image file, and some that might pertain only to the specific chip. """ def __init__(self,filename, output=None, group=None,inmemory=False): super().__init__(filename) #filutil open returns a fits object try: self._image=fileutil.openImage(filename, clobber=False, memmap=False) except IOError: raise IOError("Unable to open file: %s" % filename) #populate the global attributes which are good for all the chips in the file #self._rootname=self._image['PRIMARY'].header["ROOTNAME"] self._rootname=fileutil.buildNewRootname(filename) self.outputNames=self._setOutputNames(self._rootname) self.outroot = '_'.join(output.split('_')[:-1]) if output else '_'.join(filename.split('_')[:-1]) self.outroot = self.outroot.lower() # flag to indicate whether or not to write out intermediate products # to disk (default) or keep everything in memory self.inmemory = inmemory self._initVirtualOutputs() #self._exptime=self._image["PRIMARY"].header["EXPTIME"] #exptime should be set in the image subclass code since it's kept in different places # if(self._exptime == 0): self._exptime =1. #to avoid divide by zero # print "Setting exposure time to 1. to avoid div/0!" #this is the number of science chips to be processed in the file self._numchips=self._countEXT(extname=self.scienceExt) self.proc_unit = None #self._nextend=self._image["PRIMARY"].header["NEXTEND"] self._nextend = self._countEXT(extname=None) if (self._numchips == 0): #the simple fits image contains the data in the primary extension, #this will help us deal with the rest of the code that looks #and acts on chips :) #self._nextend=1 self._numchips=1 self.scienceExt="PRIMARY" self.maskExt=None self._image["PRIMARY"].header["EXTNAME"] = "PRIMARY" self._image["PRIMARY"].header["EXTVER"] = 1 self._image["PRIMARY"].extnum = 0 self._isSimpleFits = False # Clean out any stray MDRIZSKY keywords from PRIMARY headers fimg = fileutil.openImage(filename, mode='update', memmap=False) if 'MDRIZSKY' in fimg['PRIMARY'].header: del fimg['PRIMARY'].header['MDRIZSKY'] fimg.close() del fimg if group not in [None,'']: # Only use selected chip if ',' in group: group_id = group.split(',') if group_id[0].isalpha(): # user specified a specific extname,extver self.group = [int(group_id[1])] else: # user specified a list of extension numbers to process self.group = [] for grp in group_id: # find extname/extver which corresponds to this extension number group_extname = self._image[int(grp)].header['EXTNAME'] group_extver = self._image[int(grp)].header['EXTVER'] self.group.append(group_extver) else: # find extname/extver which corresponds to this extension number group_extver = self._image[int(group)].header['EXTVER'] self.group = [int(group_extver)] else: # Use all chips self.group = None if not self._isSimpleFits: #assign chip specific information for chip in range(1,self._numchips+1,1): self._assignRootname(chip) sci_chip = self._image[self.scienceExt,chip] # Set a flag to indicate whether this chip should be included # or not, based on user input from the 'group' parameter. if self.group is None or (self.group is not None and chip in self.group): sci_chip.group_member = True self._nmembers += 1 else: sci_chip.group_member = False sci_chip.signature = None sci_chip.dqname = None sci_chip.dqmaskname = None sci_chip.dqfile,sci_chip.dq_extn = self.find_DQ_extension() #self.maskExt = sci_chip.dq_extn if(sci_chip.dqfile is not None): sci_chip.dqname = sci_chip.dqfile +'['+sci_chip.dq_extn+','+str(chip)+']' # build up HSTWCS object for each chip, which will be necessary for drizzling operations sci_chip.wcs=wcs_functions.get_hstwcs(self._filename,self._image,sci_chip.extnum) sci_chip.detnum,sci_chip.binned = util.get_detnum(sci_chip.wcs,self._filename,chip) sci_chip.wcslin_pscale = 1.0 #assuming all the chips don't have the same dimensions in the file sci_chip._naxis1=sci_chip.header["NAXIS1"] sci_chip._naxis2=sci_chip.header["NAXIS2"] # record the exptime values for this chip so that it can be # easily used to generate the composite value for the final output image sci_chip._expstart,sci_chip._expend = util.get_expstart(sci_chip.header,self._image['PRIMARY'].header) sci_chip.outputNames=self._setChipOutputNames(sci_chip.rootname,chip).copy() #this is a dictionary # Set the units: both bunit and in_units self.set_units(chip) #initialize gain, readnoise, and exptime attributes # the actual values will be set by each instrument based on # keyword names specific to that instrument by 'setInstrumentParamters()' sci_chip._headergain = 1 # gain value read from header sci_chip._gain = 1.0 # calibrated gain value sci_chip._rdnoise = 1.0 # calibrated readnoise sci_chip._exptime = 1.0 sci_chip._effGain = 1.0 sci_chip._conversionFactor = 1.0 sci_chip._wtscl = 1.0 # Keep track of the sky value that should be subtracted from this chip # Read in value from image header, in case user has already # determined the sky level # # .computedSky: value to be applied by the # adrizzle/ablot steps. # .subtractedSky: value already (or will be by adrizzle/ablot) # subtracted from the image # if "MDRIZSKY" in sci_chip.header: subsky = sci_chip.header['MDRIZSKY'] log.info('Reading in MDRIZSKY of %s' % subsky) sci_chip.subtractedSky = subsky sci_chip.computedSky = subsky else: sci_chip.subtractedSky = 0.0 sci_chip.computedSky = None sci_chip.darkcurrent = 0.0 # The following attributes are used when working with sub-arrays # and get reference file arrays for auto-generation of IVM masks try: sci_chip.ltv1 = sci_chip.header['LTV1'] * -1 sci_chip.ltv2 = sci_chip.header['LTV2'] * -1 except KeyError: sci_chip.ltv1 = 0 sci_chip.ltv2 = 0 if sci_chip.ltv1 < 0: sci_chip.ltv1 = 0 if sci_chip.ltv2 < 0: sci_chip.ltv2 = 0 sci_chip.size1 = sci_chip.header['NAXIS1'] + np.round(sci_chip.ltv1) sci_chip.size2 = sci_chip.header['NAXIS2'] + np.round(sci_chip.ltv2) #sci_chip.image_shape = (sci_chip.size2,sci_chip.size1) sci_chip.image_shape = (sci_chip.header['NAXIS2'],sci_chip.header['NAXIS1']) # Interpret the array dtype by translating the IRAF BITPIX value if sci_chip.header['BITPIX'] in _IRAF_DTYPES_TO_NUMPY: sci_chip.image_dtype = _IRAF_DTYPES_TO_NUMPY[sci_chip.header['BITPIX']] if self.inmemory: # read image data array into memory shape = sci_chip.data.shape
[docs] def setInstrumentParameters(self,instrpars): """ Define instrument-specific parameters for use in the code. By definition, this definition will need to be overridden by methods defined in each instrument's sub-class. """ pass
[docs] def compute_wcslin(self,undistort=True): """ Compute the undistorted WCS based solely on the known distortion model information associated with the WCS. """ for chip in range(1,self._numchips+1,1): sci_chip = self._image[self.scienceExt,chip] chip_wcs = sci_chip.wcs.copy() if chip_wcs.sip is None or not undistort or chip_wcs.instrument=='DEFAULT': chip_wcs.sip = None chip_wcs.cpdis1 = None chip_wcs.cpdis2 = None chip_wcs.det2im = None undistort=False # compute the undistorted 'natural' plate scale for this chip wcslin = distortion.utils.output_wcs([chip_wcs],undistort=undistort) sci_chip.wcslin_pscale = wcslin.pscale
[docs] def set_units(self,chip): """ Define units for this image. """ # Determine output value of BUNITS # and make sure it is not specified as 'ergs/cm...' sci_chip = self._image[self.scienceExt,chip] _bunit = None if 'BUNIT' in sci_chip.header and sci_chip.header['BUNIT'].find('ergs') < 0: _bunit = sci_chip.header['BUNIT'] else: _bunit = 'ELECTRONS/S' sci_chip._bunit = _bunit # if '/s' in _bunit.lower(): _in_units = 'cps' else: _in_units = 'counts' sci_chip.in_units = _in_units
[docs]class WCSObject(baseImageObject): def __init__(self,filename,suffix='_drz'): super().__init__(filename) self._image = fits.HDUList() self._image.append(fits.PrimaryHDU()) # Build rootname, but guard against the rootname being given without # the '_drz.fits' suffix #patt = re.compile(r"_dr[zc]\w*.fits$") drz_extn = suffix patt = re.compile(r"_dr[zc]") m = patt.search(filename) if m: self._rootname = filename[:m.start()] drz_extn = m.group() else: # Guard against having .fits in the rootname if '.fits' in filename: self._rootname = filename[:filename.find('.fits')] drz_extn = '' else: self._rootname = filename self.outputNames = self._setOutputNames(self._rootname,suffix=drz_extn) self.nimages = 1 self._bunit = 'ELECTRONS/S' self.default_wcs = None self.final_wcs = None self.single_wcs = None
[docs] def restore_wcs(self): self.wcs = copy.copy(self.default_wcs)