Source code for punchpipe.flows.level1

import json
import typing as t
from datetime import datetime, timedelta

from prefect import flow, get_run_logger, task
from prefect.cache_policies import NO_CACHE
from punchbowl.level1.flow import level1_early_core_flow, level1_late_core_flow
from sqlalchemy import func, text
from sqlalchemy.orm import aliased

from punchpipe import __version__
from punchpipe.control import cache_layer
from punchpipe.control.db import File, FileRelationship, Flow
from punchpipe.control.processor import generic_process_flow_logic
from punchpipe.control.scheduler import generic_scheduler_flow_logic
from punchpipe.flows.util import file_name_to_full_path, summarize_files_missing_cal_files

SCIENCE_LEVEL0_TYPE_CODES = ["PM", "PZ", "PP", "CR"]
SCIENCE_LEVEL1_LATE_INPUT_TYPE_CODES = ["XM", "XZ", "XP", "XR"]
SCIENCE_LEVEL1_LATE_OUTPUT_TYPE_CODES = ["PM", "PZ", "PP", "CR"]
SCIENCE_LEVEL1_QUICK_INPUT_TYPE_CODES = ["XR"]
SCIENCE_LEVEL1_QUICK_OUTPUT_TYPE_CODES = ["QR"]

[docs] @task(cache_policy=NO_CACHE) def level1_early_query_ready_files(session, pipeline_config: dict, reference_time=None, max_n=9e99): logger = get_run_logger() ready = (session.query(File).filter(File.file_type.in_(SCIENCE_LEVEL0_TYPE_CODES)) .filter(File.state == "created") .filter(File.level == "0") .order_by(File.date_obs.desc()).all()) quartic_models = get_quartic_model_paths(ready, pipeline_config, session) vignetting_functions = get_vignetting_function_paths(ready, pipeline_config, session) mask_files = get_mask_files(ready, pipeline_config, session) actually_ready = [] missing_quartic = [] missing_vignetting = [] missing_mask = [] for f, quartic_model, vignetting_function, mask_file in zip( ready, quartic_models, vignetting_functions, mask_files): if quartic_model is None: missing_quartic.append(f) continue if vignetting_function[0] is None: missing_vignetting.append(f) continue if mask_file is None: missing_mask.append(f) continue # Smuggle the identified models out of this function f.quartic_model = quartic_model f.vignetting_functions = vignetting_function f.mask_file = mask_file actually_ready.append([f]) if len(actually_ready) >= max_n: break if missing_quartic: logger.info("Missing quartic files for " + summarize_files_missing_cal_files(missing_quartic)) if missing_vignetting: logger.info("Missing vignetting for " + summarize_files_missing_cal_files(missing_vignetting)) if missing_mask: logger.info("Missing mask for " + summarize_files_missing_cal_files(missing_mask)) return actually_ready
[docs] def get_distortion_paths(level0_files, pipeline_config: dict, session=None): # Get all models, in reverse-chronological order models = (session.query(File) .filter(File.file_type == 'DS') .where(File.file_version.not_like("v%")) #filters out "v0a" .order_by(File.file_version.desc(), File.date_obs.desc()).all()) results = [] for l0_file in level0_files: # We want to pick the latest model that's before the observation, so we go backwards in time, past any # later-in-time models, until we hit the first model that's before the observation. for model in models: if l0_file.observatory != model.observatory: continue if model.date_obs > l0_file.date_obs: continue results.append(model) break else: results.append(None) return results
[docs] def get_distortion_path(level0_file, pipeline_config: dict, session=None, reference_time=None): best_function = (session.query(File) .filter(File.file_type == "DS") .filter(File.observatory == level0_file.observatory) .where(File.date_obs <= level0_file.date_obs) .where(File.file_version.not_like("v%")) #filters out "v0a" .order_by(File.file_version.desc(), File.date_obs.desc()).first()) return best_function
VIGNETTING_CORRESPONDING_TYPES = {"PM": "GM", "PZ": "GZ", "PP": "GP", "CR": "GR"}
[docs] def get_vignetting_function_paths(level0_files, pipeline_config: dict, session=None): # Get all models, in reverse-chronological order models = (session.query(File) .filter(File.file_type.in_(['GM', 'GZ', 'GP', 'GR'])) .where(File.file_version.not_like("v%")) #filters out "v0a". .order_by(File.file_version.desc(), File.date_obs.desc()).all()) results = [] for l0_file in level0_files: target_type = VIGNETTING_CORRESPONDING_TYPES[l0_file.file_type] # We want to pick the latest model that's before the observation, so we go backwards in time, past any # later-in-time models, until we hit the first model that's before the observation. before_model, after_model = None, None for model in models: if l0_file.observatory != model.observatory: continue if target_type != model.file_type: continue if model.date_obs > l0_file.date_obs: continue before_model = model break if l0_file.observatory == '4': for model in models[::-1]: if l0_file.observatory != model.observatory: continue if target_type != model.file_type: continue if model.date_obs < l0_file.date_obs: continue after_model = model break results.append((before_model, after_model)) return results
[docs] def get_vignetting_function_path(level0_file, pipeline_config: dict, session=None, reference_time=None): vignetting_function_type = VIGNETTING_CORRESPONDING_TYPES[level0_file.file_type] best_function = (session.query(File) .filter(File.file_type == vignetting_function_type) .filter(File.observatory == level0_file.observatory) .where(File.date_obs <= level0_file.date_obs) .where(File.file_version.not_like("v%")) #filters out "v0a". .order_by(File.file_version.desc(), File.date_obs.desc())).first() if level0_file.observatory == '4': other_best_function = (session.query(File) .filter(File.file_type == vignetting_function_type) .filter(File.observatory == level0_file.observatory) .where(File.date_obs >= level0_file.date_obs) .where(File.file_version.not_like("v%")) # filters out "v0a". .order_by(File.file_version.desc(), File.date_obs.asc())).first() return best_function, other_best_function return best_function
PSF_MODEL_CORRESPONDING_TYPES = {"PM": "RM", "PZ": "RZ", "PP": "RP", "CR": "RC", "XM": "RM", "XZ": "RZ", "XP": "RP", "XR": "RC"}
[docs] def get_psf_model_paths(level0_files, pipeline_config: dict, session=None): # Get all models, in reverse-chronological order models = (session.query(File) .filter(File.file_type.startswith('R')) .where(File.file_version.not_like("v%")) #filters out "v0a". .order_by(File.file_version.desc(), File.date_obs.desc()).all()) results = [] for l0_file in level0_files: # TODO - Turn this back on once fine tuned for NFI if l0_file.observatory == "4": results.append("") continue target_type = PSF_MODEL_CORRESPONDING_TYPES[l0_file.file_type] # We want to pick the latest model that's before the observation, so we go backwards in time, past any # later-in-time models, until we hit the first model that's before the observation. for model in models: if l0_file.observatory != model.observatory: continue if target_type != model.file_type: continue if model.date_obs > l0_file.date_obs: continue results.append(model.filename()) break else: results.append(None) return results
[docs] def get_psf_model_path(level0_file, pipeline_config: dict, session=None, reference_time=None) -> str: psf_model_type = PSF_MODEL_CORRESPONDING_TYPES[level0_file.file_type] # TODO - Turn this back on once fine tuned for NFI if level0_file.observatory == "4": return "" best_model = (session.query(File) .filter(File.file_type == psf_model_type) .filter(File.observatory == level0_file.observatory) .where(File.date_obs <= level0_file.date_obs) .where(File.file_version.not_like("v%")) #filters out "v0a". .order_by(File.file_version.desc(), File.date_obs.desc()).first()) return best_model.filename()
STRAY_LIGHT_CORRESPONDING_TYPES = {"PM": "SM", "PZ": "SZ", "PP": "SP", "CR": "SR", "XM": "SM", "XZ": "SZ", "XP": "SP", "XR": "SR"}
[docs] def get_two_closest_stray_light(level0_file, session=None, max_distance: timedelta = None): model_type = STRAY_LIGHT_CORRESPONDING_TYPES[level0_file.file_type] best_models = (session.query(File, dt := func.abs(func.timestampdiff( text("second"), File.date_obs, level0_file.date_obs))) .filter(File.file_type == model_type) .filter(File.observatory == level0_file.observatory) .filter(File.state == "created")) if max_distance: best_models = best_models.filter(dt < max_distance.total_seconds()) best_models = best_models.order_by(dt.asc()).limit(2).all() if len(best_models) < 2: return None, None # Drop the dt values best_models = [x[0] for x in best_models] if best_models[1].date_obs < best_models[0].date_obs: best_models = best_models[::-1] return best_models
[docs] def get_two_best_stray_light(level0_file, session=None): model_type = STRAY_LIGHT_CORRESPONDING_TYPES[level0_file.file_type] before_model = (session.query(File) .filter(File.file_type == model_type) .filter(File.observatory == level0_file.observatory) .filter(File.date_obs < level0_file.date_obs) .order_by(File.date_obs.desc()).first()) after_model = (session.query(File) .filter(File.file_type == model_type) .filter(File.observatory == level0_file.observatory) .filter(File.date_obs > level0_file.date_obs) .order_by(File.date_obs.asc()).first()) if before_model is None or after_model is None: # We're waiting for the scheduler to fill in here and tell us what's what return None, None elif before_model.state == "created" and after_model.state == "created": # Good to go! return before_model, after_model elif before_model.state == "impossible" or after_model.state == "impossible": # Flexible mode dt = func.abs(func.timestampdiff(text("second"), File.date_obs, level0_file.date_obs)) models = (session.query(File, dt) .filter(File.file_type == model_type) .filter(File.observatory == level0_file.observatory) .filter(File.level == '1') .filter(File.state != "impossible") .order_by(dt.asc()) .limit(2).all()) # Drop the dt values before_model, after_model = [x[0] for x in models] if before_model.state == "created" and after_model.state == "created": # Good to go! return before_model, after_model else: # Wait for files to generate return None, None # If we're here, we're waiting for at least one model to generate, but we do expect it to do so return None, None
[docs] def get_first_last_stray_light(session): dates = (session.query(func.min(File.date_obs), func.max(File.date_obs)) .where(File.file_type.like('S%')). where(File.state == 'created')).all() if len(dates) == 0: return datetime(1900, 1, 1), datetime(2900, 1, 1) return dates[0]
[docs] def get_quartic_model_paths(level0_files, pipeline_config: dict, session=None): # Get all models, in reverse-chronological order models = (session.query(File) .filter(File.file_type == 'FQ') .where(File.file_version.not_like("v%")) #filters out "v0a". .order_by(File.file_version.desc(), File.date_obs.desc()).all()) results = [] for l0_file in level0_files: # We want to pick the latest model that's before the observation, so we go backwards in time, past any # later-in-time models, until we hit the first model that's before the observation. for model in models: if l0_file.observatory != model.observatory: continue if model.date_obs > l0_file.date_obs: continue results.append(model) break else: results.append(None) return results
[docs] def get_quartic_model_path(level0_file, pipeline_config: dict, session=None, reference_time=None): best_model = (session.query(File) .filter(File.file_type == 'FQ') .filter(File.observatory == level0_file.observatory) .where(File.date_obs <= level0_file.date_obs) .where(File.file_version.not_like("v%")) #filters out "v0a". .order_by(File.file_version.desc(), File.date_obs.desc()).first()) return best_model
[docs] def get_mask_files(level0_files, pipeline_config: dict, session=None): # Get all models, in reverse-chronological order models = (session.query(File) .filter(File.file_type == 'MS') .where(File.file_version.not_like("v%")) #filters out "v0a". .order_by(File.file_version.desc(), File.date_obs.desc()).all()) results = [] for l0_file in level0_files: # We want to pick the latest model that's before the observation, so we go backwards in time, past any # later-in-time models, until we hit the first model that's before the observation. for model in models: if l0_file.observatory != model.observatory: continue if model.date_obs > l0_file.date_obs: continue results.append(model) break else: results.append(None) return results
[docs] def get_mask_file(level0_file, pipeline_config: dict, session=None, reference_time=None): best_model = (session.query(File) .filter(File.file_type == 'MS') .filter(File.observatory == level0_file.observatory) .where(File.date_obs <= level0_file.date_obs) .where(File.file_version.not_like("v%")) #filters out "v0a". .order_by(File.file_version.desc(), File.date_obs.desc()).first()) return best_model
[docs] def get_ccd_parameters(level0_file, pipeline_config: dict, session=None): gain_bottom, gain_top = pipeline_config['ccd_gain'][int(level0_file.observatory)] return {"gain_bottom": gain_bottom, "gain_top": gain_top}
[docs] def level1_early_construct_flow_info(level0_files: list[File], level1_files: list[File], pipeline_config: dict, session=None, reference_time=None): flow_type = "level1_early" state = "planned" creation_time = datetime.now() priority = pipeline_config["flows"][flow_type]["priority"]["initial"] before_vignetting_function = level0_files[0].vignetting_functions[0] after_vignetting_function = level0_files[0].vignetting_functions[1] if after_vignetting_function is not None: after_vignetting_function = after_vignetting_function.filename() best_quartic_model = level0_files[0].quartic_model ccd_parameters = get_ccd_parameters(level0_files[0], pipeline_config, session=session) mask_function = level0_files[0].mask_file call_data = json.dumps( { "input_data": [level0_file.filename() for level0_file in level0_files], "vignetting_function_path": before_vignetting_function.filename(), "second_vignetting_function_path": after_vignetting_function, "quartic_coefficient_path": best_quartic_model.filename(), "gain_bottom": ccd_parameters['gain_bottom'], "gain_top": ccd_parameters['gain_top'], "mask_path": mask_function.filename().replace('.fits', '.bin'), } ) return Flow( flow_type=flow_type, flow_level="1", state=state, creation_time=creation_time, priority=priority, call_data=call_data, )
[docs] def level1_early_construct_file_info(level0_files: t.List[File], pipeline_config: dict, reference_time=None) -> t.List[File]: files = [] files.append(File( level="1", file_type='X' + level0_files[0].file_type[1:], observatory=level0_files[0].observatory, file_version=pipeline_config["file_version"], software_version=__version__, date_obs=level0_files[0].date_obs, polarization=level0_files[0].polarization, outlier=level0_files[0].outlier, bad_packets=level0_files[0].bad_packets, state="planned", )) return files
[docs] @flow def level1_early_scheduler_flow(pipeline_config_path=None, session=None, reference_time=None): generic_scheduler_flow_logic( level1_early_query_ready_files, level1_early_construct_file_info, level1_early_construct_flow_info, pipeline_config_path, reference_time=reference_time, session=session, )
[docs] def level1_early_call_data_processor(call_data: dict, pipeline_config, session=None) -> dict: for key in ['input_data', 'quartic_coefficient_path', 'vignetting_function_path', 'second_vignetting_function_path', 'mask_path']: call_data[key] = file_name_to_full_path(call_data[key], pipeline_config['root']) call_data['quartic_coefficient_path'] = cache_layer.quartic_coefficients.wrap_if_appropriate( call_data['quartic_coefficient_path']) call_data['vignetting_function_path'] = cache_layer.vignetting_function.wrap_if_appropriate( call_data['vignetting_function_path']) if call_data['second_vignetting_function_path'] is not None: call_data['second_vignetting_function_path'] = cache_layer.vignetting_function.wrap_if_appropriate( call_data['second_vignetting_function_path']) # Anything more than 16 doesn't offer any real benefit, and the default of n_cpu on punch190 is actually slower than # 16! Here we choose less to have less spiky CPU usage to play better with other flows. call_data['max_workers'] = 2 return call_data
[docs] @flow def level1_early_process_flow(flow_id: int | list[int], pipeline_config_path=None, session=None): generic_process_flow_logic(flow_id, level1_early_core_flow, pipeline_config_path, session=session, call_data_processor=level1_early_call_data_processor)
[docs] @task(cache_policy=NO_CACHE) def level1_late_query_ready_files(session, pipeline_config: dict, reference_time=None, max_n=9e99): logger = get_run_logger() start_date, end_date = get_first_last_stray_light(session) parent = aliased(File) child = aliased(File) child_exists_subquery = (session.query(parent) .join(FileRelationship, FileRelationship.parent == parent.file_id) .join(child, FileRelationship.child == child.file_id) .filter(parent.file_id == File.file_id) .filter(child.file_type.in_(SCIENCE_LEVEL1_LATE_OUTPUT_TYPE_CODES)) .exists()) ready = (session.query(File) .filter(File.file_type.in_(SCIENCE_LEVEL1_LATE_INPUT_TYPE_CODES)) .filter(File.level == "1") .filter(File.state.in_(["created", "progressed"])) .filter(~child_exists_subquery) .filter(File.date_obs >= start_date) .filter(File.date_obs <= end_date) .order_by(File.date_obs.desc()).all()) distortion_paths = get_distortion_paths(ready, pipeline_config, session) psf_paths = get_psf_model_paths(ready, pipeline_config, session) actually_ready = [] missing_stray_light = [] missing_distortion = [] missing_psf = [] for f, distortion_path, psf_path in zip(ready, distortion_paths, psf_paths): best_stray_light = list(get_two_best_stray_light(f, session=session)) if best_stray_light == [None, None]: missing_stray_light.append(f) continue if distortion_path is None: missing_distortion.append(f) continue if psf_path is None: missing_psf.append(f) continue f.distortion_path = distortion_path f.psf_path = psf_path f.stray_light = best_stray_light actually_ready.append([f]) if len(actually_ready) >= max_n: break if missing_stray_light: logger.info("Waiting for stray light models for " + summarize_files_missing_cal_files(missing_stray_light)) if missing_distortion: logger.info("Missing distortion for " + summarize_files_missing_cal_files(missing_distortion)) if missing_psf: logger.info("Missing PSF for " + summarize_files_missing_cal_files(missing_psf)) # It's easiest to batch-query here, where we have all the File objects in one list masks = get_mask_files([f[0] for f in actually_ready], pipeline_config, session) for f, mask in zip(actually_ready, masks): f[0].mask_path = mask return actually_ready
[docs] def level1_late_construct_flow_info(input_files: list[File], output_files: list[File], pipeline_config: dict, session=None, reference_time=None): flow_type = "level1_late" state = "planned" creation_time = datetime.now() priority = pipeline_config["flows"][flow_type]["priority"]["initial"] best_psf_model = input_files[0].psf_path best_distortion = input_files[0].distortion_path stray_light_before, stray_light_after = input_files[0].stray_light mask_function = input_files[0].mask_path call_data = json.dumps( { "input_data": [input_file.filename() for input_file in input_files], "psf_model_path": best_psf_model, "distortion_path": best_distortion.filename(), "stray_light_before_path": stray_light_before.filename() if stray_light_before else None, "stray_light_after_path": stray_light_after.filename() if stray_light_after else None, "mask_path": mask_function.filename().replace('.fits', '.bin'), "output_as_Q_file": False, } ) return Flow( flow_type=flow_type, flow_level="1", state=state, creation_time=creation_time, priority=priority, call_data=call_data, )
[docs] def level1_late_construct_file_info(input_files: t.List[File], pipeline_config: dict, reference_time=None) -> t.List[File]: prefix = 'C' if input_files[0].polarization == 'C' else 'P' return [ File( level="1", file_type=prefix + input_files[0].file_type[1:], observatory=input_files[0].observatory, file_version=pipeline_config["file_version"], software_version=__version__, date_obs=input_files[0].date_obs, polarization=input_files[0].polarization, outlier=input_files[0].outlier, bad_packets=input_files[0].bad_packets, state="planned", ) ]
[docs] @flow def level1_late_scheduler_flow(pipeline_config_path=None, session=None, reference_time=None): generic_scheduler_flow_logic( level1_late_query_ready_files, level1_late_construct_file_info, level1_late_construct_flow_info, pipeline_config_path, reference_time=reference_time, session=session, )
[docs] def level1_late_call_data_processor(call_data: dict, pipeline_config, session=None) -> dict: for key in ['input_data', 'mask_path', 'stray_light_before_path', 'stray_light_after_path', 'distortion_path']: call_data[key] = file_name_to_full_path(call_data[key], pipeline_config['root']) # TODO: this is a hack to skip NFI PSF. Remove! if call_data['psf_model_path'] == "": call_data['psf_model_path'] = None else: call_data['psf_model_path'] = file_name_to_full_path(call_data['psf_model_path'], pipeline_config['root']) call_data['psf_model_path'] = cache_layer.psf.wrap_if_appropriate(call_data['psf_model_path']) # Anything more than 16 doesn't offer any real benefit, and the default of n_cpu on punch190 is actually slower than # 16! Here we choose less to have less spiky CPU usage to play better with other flows. call_data['max_workers'] = 2 return call_data
[docs] @flow def level1_late_process_flow(flow_id: int | list[int], pipeline_config_path=None, session=None): generic_process_flow_logic(flow_id, level1_late_core_flow, pipeline_config_path, session=session, call_data_processor=level1_late_call_data_processor)
[docs] @task(cache_policy=NO_CACHE) def level1_quick_query_ready_files(session, pipeline_config: dict, reference_time=None, max_n=9e99): logger = get_run_logger() parent = aliased(File) child = aliased(File) no_earlier_than = pipeline_config["flows"]["level1_quick"].get("no-earlier-than", "1970-01-01") child_exists_subquery = (session.query(parent) .join(FileRelationship, FileRelationship.parent == parent.file_id) .join(child, FileRelationship.child == child.file_id) .filter(parent.file_id == File.file_id) .filter(child.file_type.in_(SCIENCE_LEVEL1_QUICK_OUTPUT_TYPE_CODES)) .exists()) ready = (session.query(File) .filter(File.file_type.in_(SCIENCE_LEVEL1_QUICK_INPUT_TYPE_CODES)) .filter(File.level == "1") .filter(File.state.in_(["created", "progressed"])) .filter(File.date_obs >= no_earlier_than) .filter(~child_exists_subquery) .order_by(File.date_obs.desc()).all()) actually_ready = [] missing_stray_light = [] missing_distortion = [] missing_psf = [] distortion_paths = get_distortion_paths(ready, pipeline_config, session) psf_paths = get_psf_model_paths(ready, pipeline_config, session) for f, distortion_path, psf_path in zip(ready, distortion_paths, psf_paths): closest_stray_light = list(get_two_closest_stray_light(f, session=session)) if closest_stray_light == [None, None]: missing_stray_light.append(f) continue if distortion_path is None: missing_distortion.append(f) continue if psf_path is None: missing_psf.append(f) continue f.distortion_path = distortion_path f.psf_path = psf_path f.stray_light = closest_stray_light actually_ready.append([f]) if len(actually_ready) >= max_n: break if missing_stray_light: logger.info("Waiting for stray light models for " + summarize_files_missing_cal_files(missing_stray_light)) if missing_distortion: logger.info("Missing distortion for " + summarize_files_missing_cal_files(missing_distortion)) if missing_psf: logger.info("Missing PSF for " + summarize_files_missing_cal_files(missing_psf)) # It's easiest to batch-query here, where we have all the File objects in one list masks = get_mask_files([f[0] for f in actually_ready], pipeline_config, session) for f, mask in zip(actually_ready, masks): f[0].mask_path = mask return actually_ready
[docs] def level1_quick_construct_flow_info(input_files: list[File], output_files: list[File], pipeline_config: dict, session=None, reference_time=None): flow_type = "level1_quick" state = "planned" creation_time = datetime.now() priority = pipeline_config["flows"][flow_type]["priority"]["initial"] best_psf_model = input_files[0].psf_path best_distortion = input_files[0].distortion_path stray_light_before, stray_light_after = input_files[0].stray_light mask_function = input_files[0].mask_path call_data = json.dumps( { "input_data": [input_file.filename() for input_file in input_files], "psf_model_path": best_psf_model, "distortion_path": best_distortion.filename(), "stray_light_before_path": stray_light_before.filename() if stray_light_before else None, "stray_light_after_path": stray_light_after.filename() if stray_light_after else None, "mask_path": mask_function.filename().replace('.fits', '.bin'), "output_as_Q_file": True, } ) return Flow( flow_type=flow_type, flow_level="1", state=state, creation_time=creation_time, priority=priority, call_data=call_data, )
[docs] def level1_quick_construct_file_info(input_files: t.List[File], pipeline_config: dict, reference_time=None) -> t.List[File]: return [ File( level="1", file_type="Q" + input_files[0].file_type[1:], observatory=input_files[0].observatory, file_version=pipeline_config["file_version"], software_version=__version__, date_obs=input_files[0].date_obs, polarization=input_files[0].polarization, outlier=input_files[0].outlier, bad_packets=input_files[0].bad_packets, state="planned", ) ]
[docs] @flow def level1_quick_scheduler_flow(pipeline_config_path=None, session=None, reference_time=None): generic_scheduler_flow_logic( level1_quick_query_ready_files, level1_quick_construct_file_info, level1_quick_construct_flow_info, pipeline_config_path, reference_time=reference_time, session=session, )
[docs] def level1_quick_call_data_processor(call_data: dict, pipeline_config, session=None) -> dict: for key in ['input_data', 'mask_path', 'stray_light_before_path', 'stray_light_after_path', 'distortion_path']: call_data[key] = file_name_to_full_path(call_data[key], pipeline_config['root']) # TODO: this is a hack to skip NFI PSF. Remove! if call_data['psf_model_path'] == "": call_data['psf_model_path'] = None else: call_data['psf_model_path'] = file_name_to_full_path(call_data['psf_model_path'], pipeline_config['root']) call_data['psf_model_path'] = cache_layer.psf.wrap_if_appropriate(call_data['psf_model_path']) # Anything more than 16 doesn't offer any real benefit, and the default of n_cpu on punch190 is actually slower than # 16! Here we choose less to have less spiky CPU usage to play better with other flows. call_data['max_workers'] = 2 return call_data
[docs] @flow def level1_quick_process_flow(flow_id: int | list[int], pipeline_config_path=None, session=None): generic_process_flow_logic(flow_id, level1_late_core_flow, pipeline_config_path, session=session, call_data_processor=level1_quick_call_data_processor)