Source code for drizzlepac.ablot

"""
In this step the median image now gets blotted back to create median-cleaned
images which can be compared directly with each input image to identify
cosmic-rays.

:Authors: Warren Hack

:License: :doc:`LICENSE`

"""
import os
import sys
import numpy as np
from stsci.tools import fileutil, teal, logutil
from . import outputimage
from . import wcs_functions
from . import processInput
from . import util
import stwcs
from stwcs import distortion

try:
    from . import cdriz
except ImportError:
    cdriz = None
    print('\n Coordinate transformation and image resampling library NOT found!')
    print('\n Please check the installation of this package to insure C code was built successfully.')
    raise ImportError

from .version import *

__all__ = ['blot', 'runBlot', 'help', 'getHelpAsString']

__taskname__ = 'drizzlepac.ablot'
_blot_step_num_ = 5

log = logutil.create_logger(__name__, level=logutil.logging.NOTSET)


#
#### User level interface run from TEAL
#

[docs]def blot(data, reference, outdata, configObj=None, wcsmap=wcs_functions.WCSMap, editpars=False, **input_dict): if input_dict is None: input_dict = {} input_dict['data'] = data input_dict['reference'] = reference input_dict['outdata'] = outdata # If called from interactive user-interface, configObj will not be # defined yet, so get defaults using EPAR/TEAL. # # Also insure that the input_dict (user-specified values) are folded in # with a fully populated configObj instance. configObj = util.getDefaultConfigObj(__taskname__, configObj, input_dict, loadOnly=(not editpars)) if configObj is None: return if not editpars: run(configObj, wcsmap=wcsmap)
def run(configObj,wcsmap=None): """ Run the blot task based on parameters provided interactively by the user. """ # Insure all output filenames specified have .fits extensions if configObj['outdata'][-5:] != '.fits': configObj['outdata'] += '.fits' scale_pars = configObj['Data Scaling Parameters'] user_wcs_pars = configObj['User WCS Parameters'] # PyFITS can be used here as it will always operate on # output from PyDrizzle (which will always be a FITS file) # Open the input (drizzled?) image _fname,_sciextn = fileutil.parseFilename(configObj['data']) _inimg = fileutil.openImage(_fname, memmap=False) _expin = fileutil.getKeyword(configObj['data'],scale_pars['expkey'],handle=_inimg) # Return the PyFITS HDU corresponding to the named extension _scihdu = fileutil.getExtn(_inimg,_sciextn) _insci = _scihdu.data.copy() _inexptime = 1.0 if scale_pars['in_units'] == 'counts': if scale_pars['expkey'] in _inimg['PRIMARY'].header: _inexptime = _inimg['PRIMARY'].header[scale_pars['expkey']] elif 'DRIZEXPT' in _inimg['PRIMARY'].header: # Try keyword written out by new 'drizzle' if no valid 'expkey' was given _inexptime = _inimg['PRIMARY'].header['DRIZEXPT'] else: raise ValueError('No valid exposure time keyword could be found ' 'for input %s' % configObj['data']) # always convert input to 'cps' for blot() algorithm if _inexptime != 0.0 or _inexptime != 1.0: np.divide(_insci, _inexptime, _insci) _inimg.close() del _inimg # read in WCS from source (drizzled) image source_wcs = stwcs.wcsutil.HSTWCS(configObj['data']) if source_wcs.wcs.is_unity(): print("WARNING: No valid WCS found for input drizzled image: {}!".format(configObj['data'])) # define blot_wcs blot_wcs = None _refname,_refextn = fileutil.parseFilename(configObj['reference']) if os.path.exists(_refname): # read in WCS from pre-existing output image blot_wcs = stwcs.wcsutil.HSTWCS(configObj['reference']) if blot_wcs.wcs.is_unity(): print("WARNING: No valid WCS found for output image: {} !".format(configObj['reference'])) # define blot WCS based on input images or specified reference WCS values if user_wcs_pars['user_wcs']: blot_wcs = wcs_functions.build_hstwcs( user_wcs_pars['raref'], user_wcs_pars['decref'], user_wcs_pars['xrefpix'], user_wcs_pars['yrefpix'], user_wcs_pars['outnx'], user_wcs_pars['outny'], user_wcs_pars['outscale'], user_wcs_pars['orient'] ) configObj['coeffs'] = None # If blot_wcs is still not defined at this point, we have a problem... if blot_wcs is None: blot_wcs = stwcs.distortion.utils.output_wcs([source_wcs],undistort=False) out_wcs = blot_wcs.copy() # perform blotting operation now _outsci = do_blot(_insci, source_wcs, out_wcs, _expin, coeffs=configObj['coeffs'], interp=configObj['interpol'], sinscl=configObj['sinscl'], stepsize=configObj['stepsize'], wcsmap=wcsmap) # create output with proper units and exptime-scaling if scale_pars['out_units'] == 'counts': if scale_pars['expout'] == 'input': _outscale = fileutil.getKeyword(configObj['reference'],scale_pars['expkey']) #_outscale = _expin else: _outscale = float(scale_pars['expout']) print("Output blotted images scaled by exptime of {}".format(_outscale)) np.multiply(_outsci, _outscale, _outsci) # Add sky back in to the blotted image, as specified by the user if configObj['addsky']: skyval = _scihdu.header['MDRIZSKY'] else: skyval = configObj['skyval'] print("Added {} counts back in to blotted image as sky.".format(skyval)) _outsci += skyval del _scihdu # Write output Numpy objects to a PyFITS file # Blotting only occurs from a drizzled SCI extension # to a blotted SCI extension... outputimage.writeSingleFITS(_outsci,blot_wcs, configObj['outdata'],configObj['reference']) # #### Top-level interface from inside AstroDrizzle #
[docs]def runBlot(imageObjectList, output_wcs, configObj={}, wcsmap=wcs_functions.WCSMap, procSteps=None): """ runBlot(imageObjectList, output_wcs, configObj={}, wcsmap=wcs_functions.WCSMap, procSteps=None) """ if procSteps is not None: procSteps.addStep('Blot') blot_name = util.getSectionName(configObj, _blot_step_num_) # This can be called directly from MultiDrizle, so only execute if # switch has been turned on (no guarantee MD will check before calling). if configObj[blot_name]['blot']: paramDict = buildBlotParamDict(configObj) log.info('USER INPUT PARAMETERS for Blot Step:') util.printParams(paramDict, log=log) run_blot(imageObjectList, output_wcs.single_wcs, paramDict, wcsmap=wcsmap) else: log.info('Blot step not performed.') if procSteps is not None: procSteps.endStep('Blot')
# Run 'drizzle' here... # def buildBlotParamDict(configObj): blot_name = util.getSectionName(configObj,_blot_step_num_) paramDict = {'blot_interp':configObj[blot_name]['blot_interp'], 'blot_sinscl':configObj[blot_name]['blot_sinscl'], 'blot_addsky':configObj[blot_name]['blot_addsky'], 'blot_skyval':configObj[blot_name]['blot_skyval'], 'coeffs':configObj['coeffs']} return paramDict def _setDefaults(configObj={}): """ set up the default parameters to run drizzle build,single,units,wt_scl,pixfrac,kernel,fillval, rot,scale,xsh,ysh,blotnx,blotny,outnx,outny,data """ paramDict={"build":True, "single":True, "in_units":"cps", "wt_scl":1., "pixfrac":1., "kernel":"square", "fillval":999., "rot":0., "scale":1., "xsh":0., "ysh":0., "blotnx":2048, "blotny":2048, "outnx":4096, "outny":4096, "data":None, "driz_separate":True, "driz_combine":False} if(len(configObj) !=0): for key in configObj.keys(): paramDict[key]=configObj[key] return paramDict def run_blot(imageObjectList,output_wcs,paramDict,wcsmap=wcs_functions.WCSMap): """ run_blot(imageObjectList, output_wcs, paramDict, wcsmap=wcs_functions.WCSMap) Perform the blot operation on the list of images. """ # Insure that input imageObject is a list if not isinstance(imageObjectList, list): imageObjectList = [imageObjectList] # # Setup the versions info dictionary for output to PRIMARY header # The keys will be used as the name reported in the header, as-is # _versions = {'AstroDrizzle':__version__, 'PyFITS':util.__fits_version__, 'Numpy':util.__numpy_version__} _hdrlist = [] for img in imageObjectList: for chip in img.returnAllChips(extname=img.scienceExt): print(' Blot: creating blotted image: ',chip.outputNames['data']) #### Check to see what names need to be included here for use in _hdrlist chip.outputNames['driz_version'] = _versions['AstroDrizzle'] outputvals = chip.outputNames.copy() outputvals.update(img.outputValues) outputvals['blotnx'] = chip.wcs.naxis1 outputvals['blotny'] = chip.wcs.naxis2 _hdrlist.append(outputvals) plist = outputvals.copy() plist.update(paramDict) # PyFITS can be used here as it will always operate on # output from PyDrizzle (which will always be a FITS file) # Open the input science file medianPar = 'outMedian' outMedianObj = img.getOutputName(medianPar) if img.inmemory: outMedian = img.outputNames[medianPar] _fname,_sciextn = fileutil.parseFilename(outMedian) _inimg = outMedianObj else: outMedian = outMedianObj _fname,_sciextn = fileutil.parseFilename(outMedian) _inimg = fileutil.openImage(_fname, memmap=False) # Return the PyFITS HDU corresponding to the named extension _scihdu = fileutil.getExtn(_inimg,_sciextn) _insci = _scihdu.data.copy() _inimg.close() del _inimg, _scihdu _outsci = do_blot(_insci, output_wcs, chip.wcs, chip._exptime, coeffs=paramDict['coeffs'], interp=paramDict['blot_interp'], sinscl=paramDict['blot_sinscl'], wcsmap=wcsmap) # Apply sky subtraction and unit conversion to blotted array to # match un-modified input array if paramDict['blot_addsky']: skyval = chip.computedSky else: skyval = paramDict['blot_skyval'] _outsci /= chip._conversionFactor if skyval is not None: _outsci += skyval log.info('Applying sky value of %0.6f to blotted image %s'% (skyval,chip.outputNames['data'])) # Write output Numpy objects to a PyFITS file # Blotting only occurs from a drizzled SCI extension # to a blotted SCI extension... _outimg = outputimage.OutputImage(_hdrlist, paramDict, build=False, wcs=chip.wcs, blot=True) _outimg.outweight = None _outimg.outcontext = None outimgs = _outimg.writeFITS(plist['data'],_outsci,None, versions=_versions,blend=False, virtual=img.inmemory) img.saveVirtualOutputs(outimgs) #_buildOutputFits(_outsci,None,plist['outblot']) _hdrlist = [] del _outsci del _outimg def do_blot(source, source_wcs, blot_wcs, exptime, coeffs = True, interp='poly5', sinscl=1.0, stepsize=10, wcsmap=None): """ Core functionality of performing the 'blot' operation to create a single blotted image from a single source image. All distortion information is assumed to be included in the WCS specification of the 'output' blotted image given in 'blot_wcs'. This is the simplest interface that can be called for stand-alone use of the blotting function. Parameters ---------- source Input numpy array of undistorted source image in units of 'cps'. source_wcs HSTWCS object representing source image distortion-corrected WCS. blot_wcs (py)wcs.WCS object representing the blotted image WCS. exptime exptime to use for scaling output blot image. A value of 1 will result in output blot image in units of 'cps'. coeffs Flag to specify whether or not to use distortion coefficients associated with blot_wcs. If False, do not apply any distortion model. interp Form of interpolation to use when blotting pixels. Valid options:: "nearest","linear","poly3", "poly5"(default), "spline3", "sinc" sinscl Scale for sinc interpolation kernel (in output, blotted pixels) stepsize Number of pixels for WCS interpolation wcsmap Custom mapping class to use to provide transformation from drizzled to blotted WCS. Default will be to use `drizzlepac.wcs_functions.WCSMap`. """ _outsci = np.zeros(blot_wcs.array_shape, dtype=np.float32) # Now pass numpy objects to callable version of Blot... build=False misval = 0.0 kscale = 1.0 xmin = 1 ymin = 1 xmax, ymax = source_wcs.pixel_shape # compute the undistorted 'natural' plate scale for this chip if coeffs: wcslin = distortion.utils.make_orthogonal_cd(blot_wcs) else: wcslin = blot_wcs blot_wcs.sip = None blot_wcs.cpdis1 = None blot_wcs.cpdis2 = None blot_wcs.det2im = None if wcsmap is None and cdriz is not None: """ Use default C mapping function. """ print('Using default C-based coordinate transformation...') mapping = cdriz.DefaultWCSMapping( blot_wcs, source_wcs, blot_wcs.pixel_shape[0], blot_wcs.pixel_shape[1], stepsize ) pix_ratio = source_wcs.pscale/wcslin.pscale else: # ##Using the Python class for the WCS-based transformation # # Use user provided mapping function print('Using coordinate transformation defined by user...') if wcsmap is None: wcsmap = wcs_functions.WCSMap wmap = wcsmap(blot_wcs,source_wcs) mapping = wmap.forward pix_ratio = source_wcs.pscale/wcslin.pscale t = cdriz.tblot( source, _outsci,xmin,xmax,ymin,ymax, pix_ratio, kscale, 1.0, 1.0, 'center',interp, exptime, misval, sinscl, 1, mapping) del mapping return _outsci
[docs]def help(file=None): """ Print out syntax help for running astrodrizzle Parameters ---------- file : str (Default = None) If given, write out help to the filename specified by this parameter Any previously existing file with this name will be deleted before writing out the help. """ helpstr = getHelpAsString(docstring=True, show_ver = True) if file is None: print(helpstr) else: if os.path.exists(file): os.remove(file) f = open(file, mode = 'w') f.write(helpstr) f.close()
[docs]def getHelpAsString(docstring = False, show_ver = True): """ return useful help from a file in the script directory called __taskname__.help """ install_dir = os.path.dirname(__file__) taskname = util.base_taskname(__taskname__, __package__) htmlfile = os.path.join(install_dir, 'htmlhelp', taskname + '.html') helpfile = os.path.join(install_dir, taskname + '.help') if docstring or (not docstring and not os.path.exists(htmlfile)): if show_ver: helpString = os.linesep + \ ' '.join([__taskname__, 'Version', __version__, ' updated on ', __version_date__]) + 2*os.linesep else: helpString = '' if os.path.exists(helpfile): helpString += teal.getHelpFileAsString(taskname, __file__) else: if __doc__ is not None: helpString += __doc__ + os.linesep else: helpString = 'file://' + htmlfile return helpString
blot.__doc__ = getHelpAsString(docstring = True, show_ver = False)