Source code for reftest.reftest

from __future__ import print_function
from . import db

import os
from jwst.pipeline import calwebb_dark, calwebb_image2, calwebb_spec2

try:
    from jwst.pipeline import SloperPipeline as Detector1Pipeline
except ImportError:
    from jwst.pipeline import Detector1Pipeline

from jwst import datamodels
import crds
from astropy.io import fits
import logging
from sqlalchemy import or_
try:
    from cStringIO import StringIO      # Python 2
except ImportError:
    from io import StringIO

p_mapping = {
    "META.EXPOSURE.TYPE": "META.EXPOSURE.P_EXPTYPE",
    "META.INSTRUMENT.BAND": "META.INSTRUMENT.P_BAND",
    "META.INSTRUMENT.DETECTOR": "META.INSTRUMENT.P_DETECTOR",
    "META.INSTRUMENT.CHANNEL": "META.INSTRUMENT.P_CHANNEL",
    "META.INSTRUMENT.FILTER": "META.INSTRUMENT.P_FILTER",
    "META.INSTRUMENT.PUPIL": "META.INSTRUMENT.P_PUPIL",
    "META.INSTRUMENT.MODULE": "META.INSTRUMENT.P_MODULE",
    "META.SUBARRAY.NAME": "META.SUBARRAY.P_SUBARRAY",
    "META.INSTRUMENT.GRATING": "META.INSTRUMENT.P_GRATING",
    "META.EXPOSURE.READPATT": "META.EXPOSURE.P_READPATT",
}

meta_to_fits = {
    'META.INSTRUMENT.NAME': 'INSTRUME',
    'META.EXPOSURE.READPATT': 'READPATT',
    'META.EXPOSURE.TYPE': 'EXP_TYPE',
    'META.INSTRUMENT.BAND': 'BAND',
    'META.INSTRUMENT.CHANNEL': 'CHANNEL',
    'META.INSTRUMENT.DETECTOR': 'DETECTOR',
    'META.INSTRUMENT.FILTER': 'FILTER',
    'META.INSTRUMENT.GRATING': 'GRATING',
    'META.SUBARRAY.NAME': 'SUBARRAY'
}

IMAGING = ['fgs_image', 'fgs_focus', 'fgs_skyflat', 'fgs_intflat', 'mir_image',
           'mir_tacq', 'mir_lyot', 'mir_4qpm', 'mir_coroncal', 'nrc_image',
           'nrc_tacq', 'nrc_coron', 'nrc_taconfirm', 'nrc_focus', 'nrc_tsimage',
           'nis_image', 'nis_ami', 'nis_tacq', 'nis_taconfirm', 'nis_focus',
           'nrs_tacq', 'nrs_taslit', 'nrs_taconfirm', 'nrs_confirm', 'nrs_image',
           'nrs_focus', 'nrs_mimf', 'nrs_bota']

[docs]def get_pipelines(exp_type): if 'DARK' in exp_type: return [calwebb_dark.DarkPipeline()] elif 'FLAT' in exp_type: return [Detector1Pipeline()] elif exp_type.lower() in IMAGING: return [Detector1Pipeline(), calwebb_image2.Image2Pipeline()] else: return [Detector1Pipeline(), calwebb_spec2.Spec2Pipeline()]
[docs]def override_reference_file(ref_file, pipeline): dm = datamodels.open(ref_file) for step in pipeline.step_defs.keys(): # check if a step has an override_<reftype> option if hasattr(getattr(pipeline, step), 'override_{}'.format(dm.meta.reftype)): setattr(getattr(pipeline, step), 'override_{}'.format(dm.meta.reftype), ref_file) print('Setting {} in {} step'.format('override_{}'.format(dm.meta.reftype), step)) return pipeline
[docs]def test_reference_file(ref_file, data_file): """ Override CRDS reference file with the supplied reference file and run pipeline with supplied data file. Parameters ---------- ref_file: str Path to reference file. data_file: str Path to data file. """ # redirect pipeline log from sys.stderr to a string log_stream = StringIO() stpipe_log = logging.Logger.manager.loggerDict['stpipe'] stpipe_log.handlers[0].stream = log_stream # allow invalid keyword values os.environ['PASS_INVALID_VALUES'] = '1' print('Testing {}'.format(data_file)) result = data_file try: for pipeline in get_pipelines(fits.getheader(data_file)['EXP_TYPE']): pipeline = override_reference_file(ref_file, pipeline) print('Running {}'.format(pipeline)) result = pipeline.run(result) print('Completed {}'.format(pipeline)) return 1 except Exception as err: print('{} failed with error {}'.format(data_file, (err.message))) return 0
[docs]def find_matches(ref_file, session, max_matches=None): """ Parameters ---------- ref_file: str File path to reference file to test max_matches: int Maximum matches to return. (Default=-1, return all matches) Returns ------- matches: list a list of filenames """ dm = datamodels.open(ref_file) context = crds.heavy_client.get_processing_mode('jwst')[1] pmap = crds.rmap.load_mapping(context) imap = pmap.get_imap(dm.meta.instrument.name) rmap = imap.get_rmap(dm.meta.reftype) meta_attrs = rmap.get_required_parkeys() meta_attrs.remove('META.OBSERVATION.DATE') meta_attrs.remove('META.OBSERVATION.TIME') query_args = [] keys_used = [] for attr in meta_attrs: # Deal with OR values if p_mapping[attr].lower() in dm.to_flat_dict(): p_attr = p_mapping[attr] if '|' in dm[p_attr.lower()]: or_vals = dm[p_attr.lower()].split('|')[:-1] or_vals = [val.strip() for val in or_vals] query_args.append(or_(getattr(db.TestData, meta_to_fits[attr]) == val for val in or_vals)) keys_used.append([meta_to_fits[attr], dm[p_attr.lower()]]) # Ignore special CRDS-only values elif dm[attr.lower()] in ['GENERIC', 'N/A', 'ANY']: pass # Normal values else: query_args.append(getattr(db.TestData, meta_to_fits[attr]) == dm[attr.lower()]) keys_used.append([meta_to_fits[attr], dm[attr.lower()]]) query_string = '\n'.join(['\t{} = {}'.format(key[0], key[1]) for key in keys_used]) print('Searching DB for test data with\n'+query_string) query_result = session.query(db.TestData).filter(*query_args) filenames = [result.filename for result in query_result] print('Found {} instances:'.format(len(filenames)), end="") print('\n'+'\n'.join(['\t'+f for f in filenames])) if filenames: if max_matches > 0: print('Using first {} matches'.format(max_matches)) else: print('\tNo matches found') return filenames[:max_matches]