Source code for punchpipe.flows.levelh

import json
import typing as t
from datetime import datetime

from prefect import flow, task
from prefect.cache_policies import NO_CACHE
from punchbowl.level1.flow import levelh_core_flow

from punchpipe import __version__
from punchpipe.control.db import File, Flow
from punchpipe.control.processor import generic_process_flow_logic
from punchpipe.control.scheduler import generic_scheduler_flow_logic
from punchpipe.flows.level1 import get_ccd_parameters, get_psf_model_path
from punchpipe.flows.util import file_name_to_full_path

SCIENCE_LEVEL0_TYPE_CODES = ["PM", "PZ", "PP", "CR"]

[docs] @task(cache_policy=NO_CACHE) def levelh_query_ready_files(session, pipeline_config: dict, reference_time=None, max_n=9e99): ready = [f for f in session.query(File).filter(File.file_type.in_(SCIENCE_LEVEL0_TYPE_CODES)) .filter(File.state == "quickpunched") .filter(File.level == "0") .order_by(File.date_obs.asc()).all()] actually_ready = [] for f in ready: if get_psf_model_path(f, pipeline_config, session=session) is not None: actually_ready.append([f.file_id]) if len(actually_ready) >= max_n: break return actually_ready
[docs] @task(cache_policy=NO_CACHE) def levelh_construct_flow_info(level0_files: list[File], level1_files: File, pipeline_config: dict, session=None, reference_time=None): flow_type = "levelh" state = "planned" creation_time = datetime.now() priority = pipeline_config["flows"][flow_type]["priority"]["initial"] best_psf_model = get_psf_model_path(level0_files[0], pipeline_config, session=session) ccd_parameters = get_ccd_parameters(level0_files[0], pipeline_config, session=session) call_data = json.dumps( { "input_data": [level0_file.filename() for level0_file in level0_files], "psf_model_path": best_psf_model.filename(), "gain_bottom": ccd_parameters['gain_bottom'], "gain_top": ccd_parameters['gain_top'] } ) return Flow( flow_type=flow_type, flow_level="H", state=state, creation_time=creation_time, priority=priority, call_data=call_data, )
[docs] @task def levelh_construct_file_info(level0_files: t.List[File], pipeline_config: dict, reference_time=None) -> t.List[File]: return [ File( level="H", file_type=level0_files[0].file_type, observatory=level0_files[0].observatory, file_version=pipeline_config["file_version"], software_version=__version__, date_obs=level0_files[0].date_obs, polarization=level0_files[0].polarization, outlier=level0_files[0].outlier, bad_packets=level0_files[0].bad_packets, state="planned", ) ]
[docs] @flow def levelh_scheduler_flow(pipeline_config_path=None, session=None, reference_time=None): generic_scheduler_flow_logic( levelh_query_ready_files, levelh_construct_file_info, levelh_construct_flow_info, pipeline_config_path, reference_time=reference_time, session=session, new_input_file_state="quickpunched" )
[docs] def levelh_call_data_processor(call_data: dict, pipeline_config, session=None) -> dict: for key in ['input_data', 'psf_model_path']: call_data[key] = file_name_to_full_path(call_data[key], pipeline_config['root']) return call_data
[docs] @flow def levelh_process_flow(flow_id: int, pipeline_config_path=None, session=None): generic_process_flow_logic(flow_id, levelh_core_flow, pipeline_config_path, session=session, call_data_processor=levelh_call_data_processor)