Source code for punchpipe.cli

import os
import time
import inspect
import argparse
import traceback
import subprocess
from pathlib import Path
from datetime import datetime, timedelta
from importlib import import_module

import pandas as pd
from prefect import Flow, get_client, serve
from prefect.client.schemas.objects import ConcurrencyLimitConfig, ConcurrencyLimitStrategy
from prefect.variables import Variable

from punchpipe.control.util import load_pipeline_configuration

THIS_DIR = os.path.dirname(__file__)

[docs] def main(): """Run the PUNCH automated pipeline""" parser = argparse.ArgumentParser(prog='punchpipe') subparsers = parser.add_subparsers(dest="command") run_parser = subparsers.add_parser('run', help="Run the pipeline.") serve_control_parser = subparsers.add_parser('serve-control', help="Serve the control flows.") serve_data_parser = subparsers.add_parser('serve-data', help="Serve the data-processing flows.") run_parser.add_argument("config", type=str, help="Path to config.") run_parser.add_argument("--launch-prefect", action="store_true", help="Launch the prefect server") run_parser.add_argument("--no-dask-cluster", action="store_true", help="Skip launching the dask cluster") serve_control_parser.add_argument("config", type=str, help="Path to config.") serve_data_parser.add_argument("config", type=str, help="Path to config.") args = parser.parse_args() if args.command == 'run': run(args.config, args.launch_prefect, not args.no_dask_cluster) elif args.command == 'serve-data': run_data(args.config) elif args.command == 'serve-control': run_control(args.config) else: parser.print_help()
[docs] def find_flow(target_flow, subpackage="flows") -> Flow: for filename in os.listdir(os.path.join(THIS_DIR, subpackage)): if filename.endswith(".py"): module_name = f"punchpipe.{subpackage}." + os.path.splitext(filename)[0] module = import_module(module_name) for name, obj in inspect.getmembers(module): if name == target_flow: return obj else: raise RuntimeError(f"No flow found for {target_flow}")
[docs] def construct_flows_to_serve(configuration_path, include_data=True, include_control=True): config = load_pipeline_configuration(configuration_path) # create each kind of flow. add both the scheduler and process flow variant of it. flows_to_serve = [] if include_data: for flow_name in config["flows"]: # first we deploy the scheduler flow specific_name = flow_name + "_scheduler_flow" specific_tags = config["flows"][flow_name].get("tags", []) specific_description = config["flows"][flow_name].get("description", "") flow_function = find_flow(specific_name) flow_deployment = flow_function.to_deployment( name=specific_name, description="Scheduler: " + specific_description, tags = ["scheduler"] + specific_tags, cron=config['flows'][flow_name].get("schedule", None), concurrency_limit=ConcurrencyLimitConfig( limit=1, collision_strategy=ConcurrencyLimitStrategy.CANCEL_NEW ), parameters={"pipeline_config_path": configuration_path} ) flows_to_serve.append(flow_deployment) # then we deploy the corresponding process flow specific_name = flow_name + "_process_flow" flow_function = find_flow(specific_name) concurrency_value = config["flows"][flow_name].get("concurrency_limit", None) concurrency_config = ConcurrencyLimitConfig( limit=concurrency_value, collision_strategy=ConcurrencyLimitStrategy.ENQUEUE ) if concurrency_value else None flow_deployment = flow_function.to_deployment( name=specific_name, description="Process: " + specific_description, tags = ["process"] + specific_tags, parameters={"pipeline_config_path": configuration_path}, concurrency_limit=concurrency_config ) flows_to_serve.append(flow_deployment) if include_control: # there are special control flows that manage the pipeline instead of processing data # time to kick those off! for flow_name in config["control"]: flow_function = find_flow(flow_name, "control") concurrency_config = ConcurrencyLimitConfig( limit=1, collision_strategy=ConcurrencyLimitStrategy.CANCEL_NEW ) flow_deployment = flow_function.to_deployment( name=flow_name, description=config["control"][flow_name].get("description", ""), tags=["control"], cron=config['control'][flow_name].get("schedule", "* * * * *"), parameters={"pipeline_config_path": configuration_path}, concurrency_limit=concurrency_config ) flows_to_serve.append(flow_deployment) return flows_to_serve
[docs] def run_data(configuration_path): with get_client(sync_client=True) as client: client.create_concurrency_limit(tag="reproject", concurrency_limit=50) client.create_concurrency_limit(tag="image_loader", concurrency_limit=50) configuration_path = str(Path(configuration_path).resolve()) serve(*construct_flows_to_serve(configuration_path, include_control=False, include_data=True))
[docs] def run_control(configuration_path): configuration_path = str(Path(configuration_path).resolve()) serve(*construct_flows_to_serve(configuration_path, include_control=True, include_data=False))
[docs] def run(configuration_path, launch_prefect=False, launch_dask_cluster=False): now = datetime.now() configuration_path = str(Path(configuration_path).resolve()) output_path = f"punchpipe_{now.strftime('%Y%m%d_%H%M%S')}.txt" print() print(f"Launching punchpipe at {now} with configuration: {configuration_path}") print(f"Terminal logs from punchpipe are in {output_path}") with open(output_path, "a") as f: shutdown_expected = False prefect_process = None prefect_services_process = None cluster_process = None data_process = None control_process = None try: numa_prefix_control = ['numactl', '--localalloc', '--physcpubind=0-11'] numa_prefix_workers = ['numactl', '--localalloc', '--physcpubind=12-63,64-125,192-255'] if launch_prefect: print("Launcing prefect") prefect_process = subprocess.Popen( [*numa_prefix_control, "prefect", "server", "start", "--no-services"], stdout=f, stderr=f) time.sleep(5) # Separating the server and the background services may help avoid overwhelming the database connections # https://github.com/PrefectHQ/prefect/issues/16299#issuecomment-2698732783 prefect_services_process = subprocess.Popen( [*numa_prefix_control, "prefect", "server", "services", "start"], stdout=f, stderr=f) if launch_dask_cluster: cluster_process = subprocess.Popen([*numa_prefix_workers, 'punchpipe_cluster', configuration_path], stdout=f, stderr=f) monitor_process = subprocess.Popen([*numa_prefix_control, "gunicorn", "-b", "0.0.0.0:8050", "--chdir", THIS_DIR + '/monitor', "app:server"], stdout=f, stderr=f) time.sleep(1) Variable.set("punchpipe_config", configuration_path, overwrite=True) # These processes send a _lot_ of output, so we let it go to the screen instead of making the log file # enormous def data_process_launcher() -> subprocess.Popen: return subprocess.Popen([*numa_prefix_workers, "punchpipe", "serve-data", configuration_path]) def control_process_launcher() -> subprocess.Popen: return subprocess.Popen([*numa_prefix_control, "punchpipe", "serve-control", configuration_path]) data_process = data_process_launcher() control_process = control_process_launcher() if launch_prefect is not None: print("Launched Prefect dashboard on http://localhost:4200/") print("Launched punchpipe monitor on http://localhost:8050/") print("Launched dask cluster on http://localhost:8786/") print("Dask dashboard available at http://localhost:8787/") print("Use ctrl-c to exit.") time.sleep(10) while True: # `.poll()` updates but does not return the object's returncode attribute if cluster_process is not None: cluster_process.poll() control_process.poll() data_process.poll() if launch_prefect: prefect_process.poll() prefect_services_process.poll() if prefect_process.returncode is not None or prefect_services_process.returncode is not None: print("Prefect process exited unexpectedly") break if cluster_process is not None and cluster_process.returncode is not None: print("Cluster process exited unexpectedly") break # Core processes are still running. Now check worker processes, which we can restart safely if control_process.returncode is not None: print(f"Restarted control process at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") control_process = control_process_launcher() if data_process.returncode is not None: print(f"Restarted data process at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") data_process = data_process_launcher() time.sleep(10) raise RuntimeError() except KeyboardInterrupt: print("Shutting down.") shutdown_expected = True except Exception as e: print(f"Received error: {e}") print(traceback.format_exc()) finally: control_process.terminate() if control_process else None data_process.terminate() if data_process else None control_process.wait() if control_process else None data_process.wait() if data_process else None time.sleep(1) if launch_prefect: prefect_services_process.terminate() if prefect_services_process else None prefect_services_process.wait() if prefect_services_process else None time.sleep(3) prefect_process.terminate() if prefect_process else None prefect_process.wait() if prefect_process else None time.sleep(3) cluster_process.terminate() if cluster_process else None monitor_process.terminate() if monitor_process else None cluster_process.wait() if cluster_process else None monitor_process.wait() if monitor_process else None print() if shutdown_expected: print("punchpipe safely shut down") else: print("punchpipe abruptly shut down")
[docs] def clean_replay(input_file: str, configuration_path, write: bool = True, window_in_days: int | None = None, reference_date : None | datetime = None) -> None | pd.DataFrame: """Clean replay requests""" if reference_date is None: reference_date = datetime.now() config = load_pipeline_configuration(configuration_path) input_path = Path(input_file) output_file = input_path.parent / f"merged_{input_path.name}" output_file_soc = input_path.parent / f"merged_soc_{input_path.name}" df = pd.read_csv(input_file) df = df.sort_values('start_block').reset_index(drop=True) if window_in_days is None: window_in_days = config['replay']['window_in_days'] if window_in_days is not None: df = df[pd.to_datetime(df['start_time']) >= (reference_date - timedelta(days=window_in_days))] df = df[pd.to_datetime(df['start_time']) <= reference_date] blocks_science = config['replay']['science_blocks'] df = df[df['start_block'] >= blocks_science[0]] df = df[df['start_block'] <= blocks_science[1]] merged_blocks = [] for _, row in df.iterrows(): start = row['start_block'] length = row['replay_length'] end = start + length start_time = row['start_time'] if length < 0: length = length + blocks_science[1] - blocks_science[0] + 1 wrapped_replay = True else: wrapped_replay = False if merged_blocks and (start <= merged_blocks[-1]['end']): last_block = merged_blocks[-1] print(f"Overlap found: Block {start}-{end} overlaps with {last_block['start_block']}-{last_block['end']}") new_end = max(last_block['end'], end) new_length = new_end - last_block['start_block'] + 1 merged_blocks[-1]['end'] = new_end merged_blocks[-1]['replay_length'] = new_length print(f"Merged into: Block {last_block['start_block']}-{new_end} (length: {new_length})") elif merged_blocks and wrapped_replay and (end >= merged_blocks[0]['start_block']): first_block = merged_blocks[0] print(f"Overlap found: Block {start}-{end} overlaps with {first_block['start_block']}-{first_block['end']}") new_end = max(first_block['end'], end) new_length = new_end - start + blocks_science[1] - blocks_science[0] + 1 merged_blocks[0]['start_block'] = start merged_blocks[0]['end'] = new_end merged_blocks[0]['replay_length'] = new_length print(f"Merged into: Block {first_block['start_block']}-{new_end} (length: {new_length})") else: merged_blocks.append({ 'start_time': start_time, 'start_block': start, 'replay_length': length, 'end': end }) if len(merged_blocks) != 0: result_df = pd.DataFrame(merged_blocks).drop('end', axis=1) print(f"\nOriginal blocks: {len(df)}") print(f"Merged blocks: {len(result_df)}") print(f"Blocks merged: {len(df) - len(result_df)}") result_df = result_df.sort_values('start_time').reset_index(drop=True) else: result_df = pd.DataFrame([]) if write: result_df.to_csv(output_file_soc, index=False) print(f"Results written to {output_file_soc}") with open(output_file.with_suffix(".txt"), 'w') as f: for _, row in result_df.iterrows(): f.write(f"start mops_fsw_start_fast_replay(xfi,{row['start_block']},{row['replay_length']})\n") else: return result_df