Source code for punchpipe.control.processor

import os
import json
from datetime import UTC, datetime

from dateutil.parser import parse as parse_datetime_str
from prefect import get_run_logger, tags
from prefect.context import MissingContextError, get_run_context

from punchpipe.control.db import File, Flow
from punchpipe.control.util import (
    get_database_session,
    load_pipeline_configuration,
    match_data_with_file_db_entry,
    write_file,
)


[docs] def generic_process_flow_logic(flow_id: int | list[int], core_flow_to_launch, pipeline_config_path: str, session=None, call_data_processor=None, ): if session is None: session = get_database_session() if isinstance(flow_id, int): flow_ids = [flow_id] else: flow_ids = flow_id try: logger = get_run_logger() logger.info(f"Running under PID {os.getpid()}") # load pipeline configuration pipeline_config = load_pipeline_configuration(pipeline_config_path) flow_db_entries = session.query(Flow).where(Flow.flow_id.in_(flow_ids)).all() if len(flow_db_entries) != len(flow_ids): raise RuntimeError("Did not find the right number of flows") file_db_entry_lists = [] for flow_db_entry in flow_db_entries: if flow_db_entry.state != "launched": raise RuntimeError(f"Flow id {flow_db_entry.flow_id} has state '{flow_db_entry.state}'; not running") logger.info(f"Will be running on flow db entry with id={flow_db_entry.flow_id}, scheduled at " f"{flow_db_entry.creation_time} and launched at {flow_db_entry.launch_time}.") # update the processing flow name with the flow run name from Prefect try: flow_run_context = get_run_context() flow_db_entry.flow_run_name = flow_run_context.flow_run.name flow_db_entry.flow_run_id = flow_run_context.flow_run.id except MissingContextError: # We're not in a flow context---probably we're running under speedster pass flow_db_entry.state = "running" flow_db_entry.start_time = datetime.now() file_db_entry_list = session.query(File).where(File.processing_flow == flow_db_entry.flow_id).all() # update the file database entries as being created try: if file_db_entry_list: for file_db_entry in file_db_entry_list: if file_db_entry.state != "planned": raise RuntimeError(f"File id {file_db_entry.file_id} has already been created.") if os.path.exists(os.path.join( file_db_entry.directory(pipeline_config['root']), file_db_entry.filename())): raise RuntimeError(f"Expected output file {file_db_entry.filename()} (id {file_db_entry.file_id}) " "already exists on disk") file_db_entry.state = "creating" else: raise RuntimeError("There should be at least one file associated with this flow. Found 0.") except: # The exception handler rolls back the transaction, but we do want our start_time to stay in place. So # commit on error, but otherwise let the transaction keep growing into a big batch session.commit() raise file_db_entry_lists.append(file_db_entry_list) session.commit() for flow_db_entry, file_db_entry_list in zip(flow_db_entries, file_db_entry_lists): # load the call data and launch the core flow flow_call_data = json.loads(flow_db_entry.call_data) if call_data_processor is not None: flow_call_data = call_data_processor(flow_call_data, pipeline_config, session) output_file_ids = set() expected_file_ids = {entry.file_id for entry in file_db_entry_list} logger.info(f"Expecting to output files with ids={expected_file_ids}.") tag_set = {entry.file_type + entry.observatory for entry in file_db_entry_list} with tags(*sorted(tag_set)): results = core_flow_to_launch(**flow_call_data) for result in results: result.meta['FILEVRSN'] = pipeline_config["file_version"] file_db_entry = match_data_with_file_db_entry(result, file_db_entry_list) logger.info(f"Preparing to write {file_db_entry.file_id}.") output_file_ids.add(file_db_entry.file_id) file_db_entry.date_obs = parse_datetime_str(result.meta['DATE-OBS'].value) file_db_entry.state = "created" date_created = parse_datetime_str(result.meta['DATE'].value) # Converts to local time date_created = date_created.replace(tzinfo=UTC).astimezone() file_db_entry.date_created = date_created result.meta['OUTLIER'] = int(file_db_entry.outlier) result.meta['BADPKTS'] = int(file_db_entry.bad_packets) filename = write_file(result, file_db_entry, pipeline_config) logger.info(f"Wrote to {filename}") missing_file_ids = expected_file_ids.difference(output_file_ids) if missing_file_ids: raise RuntimeError(f"We did not get an output cube for file ids {missing_file_ids}") session.commit() for flow_db_entry, file_db_entry_list in zip(flow_db_entries, file_db_entry_lists): # Don't overwrite if our flow state has been changed from under us (e.g. it's been changed to 'revivable') session.refresh(flow_db_entry) if flow_db_entry.state == 'running': flow_db_entry.state = "completed" flow_db_entry.end_time = datetime.now() # Note: the file_db_entry gets updated above in the writing step because it could be created or blank session.commit() except: session.rollback() session.query(Flow).filter(Flow.flow_id.in_(flow_ids)).update( {"state": "failed", "end_time": datetime.now()}) session.query(File).filter(File.processing_flow.in_(flow_ids)).update( {"state": "failed"}) session.commit() raise