Source code for punchpipe.flows.velocity

import json
from typing import List
from datetime import UTC, datetime, timedelta

from prefect import flow, get_run_logger, task
from punchbowl.level3.velocity import track_velocity

from punchpipe import __version__
from punchpipe.control.db import File, Flow
from punchpipe.control.processor import generic_process_flow_logic
from punchpipe.control.scheduler import generic_scheduler_flow_logic
from punchpipe.flows.util import file_name_to_full_path


[docs] @task def level3_vam_query_ready_files(session, pipeline_config: dict, reference_time: datetime): logger = get_run_logger() start_time = reference_time - timedelta(hours=6) logger.info(f"Looking for Level 3 CTM files after {start_time}.") ready_files = (session.query(File) .filter(File.state == "created") .filter(File.date_obs >= start_time) .filter(File.level == "3") .filter(File.file_type == "CT") .filter(File.observatory == "M").all()) logger.info(f"{len(ready_files)} CTM files found for the velocity tracking.") return [[f.file_id for f in ready_files]]
[docs] @task def level3_vam_construct_flow_info(level3_ctm_files: List[File], level3_velocity_file: File, pipeline_config: dict, reference_time: datetime, session=None): flow_type = "L3_VAM" state = "planned" creation_time = datetime.now() priority = pipeline_config["flows"][flow_type]["priority"]["initial"] call_data = json.dumps( { "files": [ctm_file.filename() for ctm_file in level3_ctm_files], "reference_time": str(reference_time) } ) return Flow( flow_type=flow_type, state=state, flow_level="3", creation_time=creation_time, priority=priority, call_data=call_data, )
[docs] @task def level3_vam_construct_file_info(level3_files: List[File], pipeline_config: dict, reference_time: datetime): return [File( level="3", file_type="VA", observatory="M", file_version=pipeline_config["file_version"], software_version=__version__, date_obs=reference_time, state="planned" )]
[docs] @flow def level3_vam_scheduler_flow(pipeline_config_path=None, session=None, reference_time: datetime | None = None): reference_time = reference_time or datetime.now(UTC) generic_scheduler_flow_logic( level3_vam_query_ready_files, level3_vam_construct_file_info, level3_vam_construct_flow_info, pipeline_config_path, update_input_file_state=False, reference_time=reference_time, session=session, )
[docs] def level3_vam_call_data_processor(call_data: dict, pipeline_config, session=None) -> dict: call_data['files'] = file_name_to_full_path(call_data['files'], pipeline_config['root']) return call_data
[docs] @flow def level3_vam_process_flow(flow_id: int, pipeline_config_path=None, session=None): generic_process_flow_logic(flow_id, track_velocity, pipeline_config_path, session=session, call_data_processor=level3_vam_call_data_processor, )
[docs] @task def level3_van_query_ready_files(session, pipeline_config: dict, reference_time: datetime): logger = get_run_logger() start_time = reference_time - timedelta(hours=6) logger.info(f"Looking for Level 3 CNN files after {start_time}.") ready_files = (session.query(File) .filter(File.state == "created") .filter(File.date_obs >= start_time) .filter(File.level == "3") .filter(File.file_type == "CN") .filter(File.observatory == "N").all()) logger.info(f"{len(ready_files)} CNN files found for the velocity tracking.") return [[f.file_id for f in ready_files]]
[docs] @task def level3_van_construct_flow_info(level3_cnn_files: List[File], level3_velocity_file: File, pipeline_config: dict, reference_time: datetime, session=None): flow_type = "L3_VAN" state = "planned" creation_time = datetime.now() priority = pipeline_config["flows"][flow_type]["priority"]["initial"] call_data = json.dumps( { "files": [cnn_file.filename() for cnn_file in level3_cnn_files], "reference_time": str(reference_time) } ) return Flow( flow_type=flow_type, state=state, flow_level="3", creation_time=creation_time, priority=priority, call_data=call_data, )
[docs] @task def level3_van_construct_file_info(level3_files: List[File], pipeline_config: dict, reference_time: datetime): return [File( level="3", file_type="VA", observatory="N", file_version=pipeline_config["file_version"], software_version=__version__, date_obs=reference_time, state="planned" )]
[docs] @flow def level3_van_scheduler_flow(pipeline_config_path=None, session=None, reference_time: datetime | None = None): reference_time = reference_time or datetime.now(UTC) generic_scheduler_flow_logic( level3_van_query_ready_files, level3_van_construct_file_info, level3_van_construct_flow_info, pipeline_config_path, update_input_file_state=False, reference_time=reference_time, session=session, )
[docs] def level3_van_call_data_processor(call_data: dict, pipeline_config, session=None) -> dict: call_data['files'] = file_name_to_full_path(call_data['files'], pipeline_config['root']) return call_data
[docs] @flow def level3_van_process_flow(flow_id: int, pipeline_config_path=None, session=None): generic_process_flow_logic(flow_id, track_velocity, pipeline_config_path, session=session, call_data_processor=level3_van_call_data_processor, )