Source code for reftest.reftest

from __future__ import print_function
from . import db
# from . import log

from jwst.pipeline import calwebb_dark, calwebb_sloper, calwebb_image2, calwebb_spec2
import crds
from astropy.io import fits
# import logging
from sqlalchemy import or_

# log = logging.getLogger(__name__)
# log.setLevel(logging.DEBUG)

meta_to_fits = {
    'META.INSTRUMENT.NAME': 'INSTRUME',
    'META.EXPOSURE.READPATT': 'READPATT',
    'META.EXPOSURE.TYPE': 'EXP_TYPE',
    'META.INSTRUMENT.BAND': 'BAND',
    'META.INSTRUMENT.CHANNEL': 'CHANNEL',
    'META.INSTRUMENT.DETECTOR': 'DETECTOR',
    'META.INSTRUMENT.FILTER': 'FILTER',
    'META.INSTRUMENT.GRATING': 'GRATING',
    'META.SUBARRAY.NAME': 'SUBARRAY'
}

IMAGING = ['fgs_image', 'fgs_focus', 'fgs_skyflat', 'fgs_intflat', 'mir_image',
           'mir_tacq', 'mir_lyot', 'mir_4qpm', 'mir_coroncal', 'nrc_image',
           'nrc_tacq', 'nrc_coron', 'nrc_taconfirm', 'nrc_focus', 'nrc_tsimage',
           'nis_image', 'nis_ami', 'nis_tacq', 'nis_taconfirm', 'nis_focus',
           'nrs_tacq', 'nrs_taslit', 'nrs_taconfirm', 'nrs_confirm', 'nrs_image',
           'nrs_focus', 'nrs_mimf', 'nrs_bota']

[docs]def get_pipelines(exp_type): if 'DARK' in exp_type: return [calwebb_dark.DarkPipeline()] elif 'FLAT' in exp_type: return [calwebb_sloper.SloperPipeline()] elif exp_type.lower() in IMAGING: return [calwebb_sloper.SloperPipeline(), calwebb_image2.Image2Pipeline()] else: return [calwebb_sloper.SloperPipeline(), calwebb_spec2.Spec2Pipeline()]
[docs]def override_reference_file(ref_file, pipeline): header = fits.getheader(ref_file) for step in pipeline.step_defs.keys(): # check if a step has an override_<reftype> option if hasattr(getattr(pipeline, step), 'override_{}'.format(header['REFTYPE'].lower())): setattr(getattr(pipeline, step), 'override_{}'.format(header['REFTYPE'].lower()), ref_file) print('Setting {} in {} step'.format('override_{}'.format(header['REFTYPE'].lower()), step)) return pipeline
[docs]def test_reference_file(ref_file, data_file): """ Override CRDS reference file with the supplied reference file and run pipeline with supplied data file. Parameters ---------- ref_file: str Path to reference file. data_file: str Path to data file. """ print('Testing {}'.format(data_file)) result = data_file try: for pipeline in get_pipelines(fits.getheader(data_file)['EXP_TYPE']): pipeline = override_reference_file(ref_file, pipeline) print('Running {}'.format(pipeline)) result = pipeline.run(result) print('Completed {}'.format(pipeline)) return 1 except Exception as err: print('{} failed with error {}'.format(data_file, (err.message))) return 0
[docs]def find_matches(ref_file, session, max_matches=None): """ Parameters ---------- ref_file: str File path to reference file to test max_matches: int Maximum matches to return. (Default=-1, return all matches) Returns ------- matches: list a list of filenames """ header = fits.getheader(ref_file) context = crds.heavy_client.get_processing_mode('jwst')[1] pmap = crds.rmap.load_mapping(context) imap = pmap.get_imap(header['INSTRUME']) rmap = imap.get_rmap(header['REFTYPE']) meta_attrs = rmap.get_required_parkeys() meta_attrs.remove('META.OBSERVATION.DATE') meta_attrs.remove('META.OBSERVATION.TIME') query_args = [] keys_used = [] for attr in meta_attrs: # Ignore special CRDS-only values if header[meta_to_fits[attr]] in ['GENERIC', 'N/A', 'ANY']: pass # Deal with OR values elif '|' in header[meta_to_fits[attr]]: or_vals = header[meta_to_fits[attr]].split('|') query_args.append(or_(getattr(db.TestData, meta_to_fits[attr]) == val for val in or_vals)) keys_used.append(meta_to_fits[attr]) # Normal values else: query_args.append(getattr(db.TestData, meta_to_fits[attr]) == header[meta_to_fits[attr]]) keys_used.append(meta_to_fits[attr]) query_string = '\n'.join(['\t{} = {}'.format(key, header[key]) for key in keys_used]) print('Searching DB for test data with\n'+query_string) query_result = session.query(db.TestData).filter(*query_args) filenames = [result.filename for result in query_result] print('Found {} instances:'.format(len(filenames)), end="") print('\n'+'\n'.join(['\t'+f for f in filenames])) if filenames: if max_matches > 0: print('Using first {} matches'.format(max_matches)) else: print('\tNo matches found') return filenames[:max_matches]