Source code for punchpipe.flows.level3

import json
import typing as t
from datetime import datetime, timedelta

from prefect import flow, get_run_logger, task
from prefect.cache_policies import NO_CACHE
from punchbowl.level3.flow import level3_core_flow, level3_PIM_flow
from sqlalchemy import and_

from punchpipe import __version__
from punchpipe.control.db import File, Flow, get_closest_after_file, get_closest_before_file, get_closest_file
from punchpipe.control.processor import generic_process_flow_logic
from punchpipe.control.scheduler import generic_scheduler_flow_logic
from punchpipe.control.util import get_database_session
from punchpipe.flows.util import file_name_to_full_path


[docs] def get_valid_starfields(session, f: File, timedelta_window: timedelta, file_type: str = "PS"): valid_star_start, valid_star_end = f.date_obs - timedelta_window, f.date_obs + timedelta_window return (session.query(File).filter(File.state == "created").filter(File.level == "3") .filter(File.file_type == file_type).filter(File.observatory == 'M') .filter(and_(f.date_obs >= valid_star_start, f.date_obs <= valid_star_end)).all())
[docs] def get_valid_fcorona_models(session, f: File, before_timedelta: timedelta, after_timedelta: timedelta, file_type="PF"): valid_fcorona_start, valid_fcorona_end = f.date_obs - before_timedelta, f.date_obs + after_timedelta return (session.query(File).filter(File.state == "created").filter(File.level == "3") .filter(File.file_type == file_type).filter(File.observatory == 'M') .filter(File.date_obs >= valid_fcorona_start) .filter(File.date_obs <= valid_fcorona_end).all())
[docs] @task(cache_policy=NO_CACHE) def level3_PTM_query_ready_files(session, pipeline_config: dict, reference_time=None, max_n=9e99): logger = get_run_logger() all_ready_files = session.query(File).where(and_(and_(File.state.in_(["progressed", "created"]), File.level == "2"), File.file_type == "PT")).order_by(File.date_obs.asc()).all() logger.info(f"{len(all_ready_files)} Level 3 PTM files need to be processed.") actually_ready_files = [] for f in all_ready_files: # TODO put magic numbers in config valid_starfields = get_valid_starfields(session, f, timedelta_window=timedelta(days=14)) if len(valid_starfields) >= 1: actually_ready_files.append(f) if len(actually_ready_files) >= max_n: break logger.info(f"{len(actually_ready_files)} Level 3 PTM files selected with necessary calibration data.") return [[f.file_id] for f in actually_ready_files]
[docs] def level3_PTM_construct_flow_info(level2_files: list[File], level3_file: File, pipeline_config: dict, session=None, reference_time=None): session = get_database_session() # TODO: replace so this works in the tests by passing in a test flow_type = "level3_PTM" state = "planned" creation_time = datetime.now() priority = pipeline_config["flows"][flow_type]["priority"]["initial"] starfield = get_closest_file(level2_files[0], get_valid_starfields(session, level2_files[0], timedelta_window=timedelta(days=14))) call_data = json.dumps( { "data_list": [level2_file.filename() for level2_file in level2_files], "starfield_background_path": starfield.filename(), } ) return Flow( flow_type=flow_type, state=state, flow_level="3", creation_time=creation_time, priority=priority, call_data=call_data, )
[docs] def level3_PTM_construct_file_info(input_files: t.List[File], pipeline_config: dict, reference_time=None) -> t.List[File]: date_obses = [f.date_obs for f in input_files] return [File( level="3", file_type="PT", observatory="M", file_version=pipeline_config["file_version"], software_version=__version__, date_obs=input_files[0].date_obs, state="planned", date_beg=min(date_obses), date_end=max(date_obses), outlier=any(file.outlier for file in input_files), bad_packets=any(file.bad_packets for file in input_files), )]
[docs] @flow def level3_PTM_scheduler_flow(pipeline_config_path=None, session=None, reference_time=None): generic_scheduler_flow_logic( level3_PTM_query_ready_files, level3_PTM_construct_file_info, level3_PTM_construct_flow_info, pipeline_config_path, reference_time=reference_time, session=session, )
[docs] def level3_PTM_call_data_processor(call_data: dict, pipeline_config, session=None) -> dict: for key in ['data_list', 'before_f_corona_model_path', 'after_f_corona_model_path', 'starfield_background_path']: call_data[key] = file_name_to_full_path(call_data[key], pipeline_config['root']) return call_data
[docs] @flow def level3_PTM_process_flow(flow_id: int | list[int], pipeline_config_path=None, session=None): generic_process_flow_logic(flow_id, level3_core_flow, pipeline_config_path, session=session, call_data_processor=level3_PTM_call_data_processor)
[docs] @task(cache_policy=NO_CACHE) def level3_PIM_query_ready_files(session, pipeline_config: dict, reference_time=None, max_n=9e99): logger = get_run_logger() all_ready_files = session.query(File).where(and_(and_(File.state == "created", File.level == "2"), File.file_type == "PT")).order_by(File.date_obs.asc()).all() logger.info(f"{len(all_ready_files)} Level 3 PTM files need to be processed.") actually_ready_files = [] for f in all_ready_files: valid_before_fcorona_models = get_valid_fcorona_models(session, f, before_timedelta=timedelta(days=14), after_timedelta=timedelta(days=0)) valid_after_fcorona_models = get_valid_fcorona_models(session, f, before_timedelta=timedelta(days=0), after_timedelta=timedelta(days=14)) if len(valid_before_fcorona_models) >= 1 and len(valid_after_fcorona_models) >= 1: actually_ready_files.append(f) if len(actually_ready_files) >= max_n: break logger.info(f"{len(actually_ready_files)} Level 2 PTM files selected with necessary calibration data.") return [[f.file_id] for f in actually_ready_files]
[docs] def level3_PIM_construct_flow_info(level2_files: list[File], level3_file: File, pipeline_config: dict, session=None, reference_time=None): session = get_database_session() # TODO: replace so this works in the tests by passing in a test flow_type = "level3_PIM" state = "planned" creation_time = datetime.now() priority = pipeline_config["flows"][flow_type]["priority"]["initial"] before_models = get_valid_fcorona_models(session, level2_files[0], before_timedelta=timedelta(days=90), after_timedelta=timedelta(days=0)) after_models = get_valid_fcorona_models(session, level2_files[0], before_timedelta=timedelta(days=0), after_timedelta=timedelta(days=90)) f_corona_before = get_closest_before_file(level2_files[0], before_models) f_corona_after = get_closest_after_file(level2_files[0], after_models) call_data = json.dumps( { "data_list": [level2_file.filename() for level2_file in level2_files], # TODO put magic numbers in config "before_f_corona_model_path": f_corona_before.filename(), "after_f_corona_model_path": f_corona_after.filename(), } ) return Flow( flow_type=flow_type, state=state, flow_level="3", creation_time=creation_time, priority=priority, call_data=call_data, )
[docs] def level3_PIM_construct_file_info(level2_files: t.List[File], pipeline_config: dict, reference_time=None) -> t.List[File]: date_obses = [f.date_obs for f in level2_files] return [File( level="3", file_type="PI", observatory="M", file_version=pipeline_config["file_version"], software_version=__version__, date_obs=level2_files[0].date_obs, state="planned", date_beg=min(date_obses), date_end=max(date_obses), outlier=any(file.outlier for file in level2_files), bad_packets=any(file.bad_packets for file in level2_files), )]
[docs] @flow def level3_PIM_scheduler_flow(pipeline_config_path: str | None = None, session=None, reference_time: datetime | None = None): generic_scheduler_flow_logic( level3_PIM_query_ready_files, level3_PIM_construct_file_info, level3_PIM_construct_flow_info, pipeline_config_path, reference_time=reference_time, session=session, )
[docs] def level3_PIM_call_data_processor(call_data: dict, pipeline_config, session=None) -> dict: for key in ['data_list', 'before_f_corona_model_path', 'after_f_corona_model_path']: call_data[key] = file_name_to_full_path(call_data[key], pipeline_config['root']) return call_data
[docs] @flow def level3_PIM_process_flow(flow_id: int | list[int], pipeline_config_path=None, session=None): generic_process_flow_logic(flow_id, level3_PIM_flow, pipeline_config_path, session=session, call_data_processor=level3_PIM_call_data_processor)
[docs] @task(cache_policy=NO_CACHE) def level3_CIM_query_ready_files(session, pipeline_config: dict, reference_time=None, max_n=9e99): logger = get_run_logger() all_ready_files = session.query(File).where(and_(and_(File.state == "created", File.level == "2"), File.file_type == "CT")).order_by(File.date_obs.asc()).all() logger.info(f"{len(all_ready_files)} Level 2 CTM files need to be processed.") actually_ready_files = [] for f in all_ready_files: valid_before_fcorona_models = get_valid_fcorona_models(session, f, before_timedelta=timedelta(days=14), after_timedelta=timedelta(days=0), file_type="CF") valid_after_fcorona_models = get_valid_fcorona_models(session, f, before_timedelta=timedelta(days=0), after_timedelta=timedelta(days=14), file_type="CF") if len(valid_before_fcorona_models) >= 1 and len(valid_after_fcorona_models) >= 1: actually_ready_files.append(f) if len(actually_ready_files) >= max_n: break logger.info(f"{len(actually_ready_files)} Level 2 CTM files selected with necessary calibration data.") return [[f.file_id] for f in actually_ready_files]
[docs] def level3_CIM_construct_flow_info(level2_files: list[File], level3_file: File, pipeline_config: dict, session=None, reference_time=None): session = get_database_session() # TODO: replace so this works in the tests by passing in a test flow_type = "level3_CIM" state = "planned" creation_time = datetime.now() priority = pipeline_config["flows"][flow_type]["priority"]["initial"] before_models = get_valid_fcorona_models(session, level2_files[0], before_timedelta=timedelta(days=90), after_timedelta=timedelta(days=0), file_type="CF") after_models = get_valid_fcorona_models(session, level2_files[0], before_timedelta=timedelta(days=0), after_timedelta=timedelta(days=90), file_type="CF") f_corona_before = get_closest_before_file(level2_files[0], before_models) f_corona_after = get_closest_after_file(level2_files[0], after_models) call_data = json.dumps( { "data_list": [level2_file.filename() for level2_file in level2_files], "before_f_corona_model_path": f_corona_before.filename(), "after_f_corona_model_path": f_corona_after.filename(), } ) return Flow( flow_type=flow_type, state=state, flow_level="3", creation_time=creation_time, priority=priority, call_data=call_data, )
[docs] def level3_CIM_construct_file_info(level2_files: t.List[File], pipeline_config: dict, reference_time=None) -> t.List[File]: date_obses = [f.date_obs for f in level2_files] return [File( level="3", file_type="CI", observatory="M", file_version=pipeline_config["file_version"], software_version=__version__, date_obs=level2_files[0].date_obs, state="planned", date_beg=min(date_obses), date_end=max(date_obses), outlier=any(file.outlier for file in level2_files), bad_packets=any(file.bad_packets for file in level2_files), )]
[docs] @flow def level3_CIM_scheduler_flow(pipeline_config_path: str | None = None, session=None, reference_time: datetime | None = None): generic_scheduler_flow_logic( level3_CIM_query_ready_files, level3_CIM_construct_file_info, level3_CIM_construct_flow_info, pipeline_config_path, reference_time=reference_time, session=session, )
[docs] def level3_CIM_call_data_processor(call_data: dict, pipeline_config, session=None) -> dict: for key in ['data_list', 'before_f_corona_model_path', 'after_f_corona_model_path']: call_data[key] = file_name_to_full_path(call_data[key], pipeline_config['root']) return call_data
[docs] @flow def level3_CIM_process_flow(flow_id: int | list[int], pipeline_config_path=None, session=None): # NOTE: this is not a typo... we're using the PIM core flow for this because it's flexible generic_process_flow_logic(flow_id, level3_PIM_flow, pipeline_config_path, session=session, call_data_processor=level3_CIM_call_data_processor)
[docs] @task(cache_policy=NO_CACHE) def level3_CTM_query_ready_files(session, pipeline_config: dict, reference_time=None, max_n=9e99): logger = get_run_logger() all_ready_files = session.query(File).where(and_(and_(File.state.in_(["created"]), File.level == "3"), File.file_type == "CI")).order_by(File.date_obs.asc()).all() logger.info(f"{len(all_ready_files)} Level 3 CIM files need to be processed.") actually_ready_files = [] for f in all_ready_files: # # TODO put magic numbers in config valid_starfields = get_valid_starfields(session, f, timedelta_window=timedelta(days=14), file_type="CS") if len(valid_starfields) >= 1: actually_ready_files.append(f) if len(actually_ready_files) >= max_n: break logger.info(f"{len(actually_ready_files)} Level 3 CIM files selected with necessary calibration data.") return [[f.file_id] for f in actually_ready_files]
[docs] def level3_CTM_construct_flow_info(level2_files: list[File], level3_file: File, pipeline_config: dict, session=None, reference_time=None): session = get_database_session() # TODO: replace so this works in the tests by passing in a test flow_type = "level3_CTM" state = "planned" creation_time = datetime.now() priority = pipeline_config["flows"][flow_type]["priority"]["initial"] starfield = get_closest_file(level2_files[0], get_valid_starfields(session, level2_files[0], timedelta_window=timedelta(days=90), file_type="CS")) call_data = json.dumps( { "data_list": [level2_file.filename() for level2_file in level2_files], "starfield_background_path": starfield.filename(), } ) return Flow( flow_type=flow_type, state=state, flow_level="3", creation_time=creation_time, priority=priority, call_data=call_data, )
[docs] def level3_CTM_construct_file_info(input_files: t.List[File], pipeline_config: dict, reference_time=None, ) -> t.List[File]: date_obses = [f.date_obs for f in input_files] return [File( level="3", file_type="CT", observatory="M", file_version=pipeline_config["file_version"], software_version=__version__, date_obs=input_files[0].date_obs, state="planned", date_beg=min(date_obses), date_end=max(date_obses), outlier=any(file.outlier for file in input_files), bad_packets=any(file.bad_packets for file in input_files), )]
[docs] @flow def level3_CTM_scheduler_flow(pipeline_config_path=None, session=None, reference_time=None): generic_scheduler_flow_logic( level3_CTM_query_ready_files, level3_CTM_construct_file_info, level3_CTM_construct_flow_info, pipeline_config_path, reference_time=reference_time, session=session, )
[docs] def level3_CTM_call_data_processor(call_data: dict, pipeline_config, session=None) -> dict: for key in ['data_list' , 'starfield_background_path']: call_data[key] = file_name_to_full_path(call_data[key], pipeline_config['root']) return call_data
[docs] @flow def level3_CTM_process_flow(flow_id: int | list[int], pipeline_config_path=None, session=None): generic_process_flow_logic(flow_id, level3_core_flow, pipeline_config_path, session=session, call_data_processor=level3_CTM_call_data_processor)