Source code for punchpipe.cluster

"""
This process starts a Dask cluster and then monitors for changes to the cluster size in the configuration file
"""
import os
import time
import argparse
import traceback
from pathlib import Path

from dask.distributed import LocalCluster

from punchpipe.control.util import load_pipeline_configuration


[docs] def main(): """Run a Dask cluster for the pipeline""" parser = argparse.ArgumentParser(prog='punchpipe-cluster') parser.add_argument("config", type=str, help="Path to config.") args = parser.parse_args() configuration_path = str(Path(args.config).resolve()) config = load_pipeline_configuration(configuration_path) config_mtime = os.path.getmtime(configuration_path) cluster = LocalCluster(n_workers=config['dask_cluster']['n_workers'], threads_per_worker=config['dask_cluster']['n_threads_per_worker'], scheduler_port=8786) try: while True: time.sleep(5) if not os.path.exists(configuration_path): # In case the file is being re-written right now. (This has happened and crashed this process!) time.sleep(1) if (cur_mtime := os.path.getmtime(configuration_path)) != config_mtime: config_mtime = cur_mtime config = load_pipeline_configuration(configuration_path) # This tells the cluster to add or remove workers cluster.scale(config['dask_cluster']['n_workers']) except Exception as e: print(f"Received error: {e}") print(traceback.format_exc()) print("Stopping cluster") cluster.close()