Source code for punchpipe.control.launcher

import asyncio
from math import ceil
from random import shuffle
from typing import List
from datetime import datetime, timedelta
from collections import defaultdict

from prefect import flow, get_run_logger, task
from prefect.cache_policies import NO_CACHE
from prefect.client import get_client
from prefect.variables import Variable
from sqlalchemy import and_, func, select, update
from sqlalchemy.orm import Session

from punchpipe.control.db import File, Flow
from punchpipe.control.util import batched, get_database_session, load_pipeline_configuration


[docs] @task(cache_policy=NO_CACHE) def gather_planned_flows(session, weight_to_launch, max_flows_to_launch, flow_weights, flow_enabled, flow_batch_sizes): # We'll have to grab a bunch of possible flows to launch from the DB, and then on our end apply the weights and the # maximum-weight limit. But we can use the smallest weight to set an upper bound on how many launchable flows to # retrieve. enabled_flows = [flow for flow, enabled in flow_enabled.items() if enabled] if enabled_flows: max_to_select = weight_to_launch / min([flow_weights[k] for k in enabled_flows]) else: max_to_select = 0 flows = (session.query(Flow) .where(Flow.state == "planned") .where(Flow.flow_type.in_(enabled_flows)) .order_by(Flow.is_backprocessing.asc(), Flow.priority.desc(), Flow.creation_time.asc()) .limit(max_to_select).all()) selected_flows = [] selected_weight = 0 selected_number = 0 count_per_type = defaultdict(lambda: 0) while selected_weight < weight_to_launch and selected_number < max_flows_to_launch and len(flows): flow = flows.pop(0) if not flow_enabled[flow.flow_type]: continue selected_flows.append(flow) selected_weight += flow_weights[flow.flow_type] selected_number += 1 / flow_batch_sizes[flow.flow_type] count_per_type[flow.flow_type] += 1 select_flow_ids = [flow.flow_id for flow in selected_flows] output_files = session.query(File).where(File.processing_flow.in_(select_flow_ids)).all() tags_by_flow = {} for flow in selected_flows: tags = set() for output_file in output_files: if output_file.processing_flow == flow.flow_id: tags.add(output_file.file_type + output_file.observatory) tags_by_flow[flow.flow_id] = sorted(tags) number_of_flows = len(selected_flows) batched_flows = [] for flow_type, batch_size in flow_batch_sizes.items(): if batch_size <= 1: continue these_flows, other_flows = [], [] for flow in selected_flows: if flow.flow_type == flow_type: these_flows.append(flow) else: other_flows.append(flow) batched_flows.extend(batched(these_flows, batch_size)) selected_flows = other_flows selected_flows = [[f] for f in selected_flows] + batched_flows return selected_flows, tags_by_flow, selected_weight, number_of_flows, count_per_type
[docs] @task(cache_policy=NO_CACHE) def count_flows(session, weights): n_planned, n_running = 0, 0 weight_planned, weight_running = 0, 0 rows = session.execute( select(Flow.state, Flow.flow_type, func.count()) .select_from(Flow) .where(Flow.state.in_(("planned", "running", "launched"))) .group_by(Flow.state, Flow.flow_type) ).all() # We won't get results for states that aren't actually in the database, so we have to inspect the returned rows for state, flow_type, count in rows: if state == "planned": n_planned += count weight_planned += count * weights[flow_type] else: n_running += count weight_running += count * weights[flow_type] return n_running, n_planned, weight_planned, weight_running
[docs] @task(cache_policy=NO_CACHE) def escalate_long_waiting_flows(session, pipeline_config): for flow_type in pipeline_config["flows"]: for max_seconds_waiting, escalated_priority in zip( pipeline_config["flows"][flow_type]["priority"]["seconds"], pipeline_config["flows"][flow_type]["priority"]["escalation"], ): since = datetime.now() - timedelta(seconds=max_seconds_waiting) session.query(Flow).where( and_(Flow.priority < escalated_priority, Flow.state == "planned", Flow.creation_time < since, Flow.flow_type == flow_type) ).update({"priority": escalated_priority}) # Commit after every update to try to avoid deadlocks. I think the problem scenario is (1) we've updated # a flow's priority, so we have that row locked until we commit, (2) the flow gets launched, so another # process wants to update that flow record and so has a pending transaction on that row, and (3) we # continue going through our loops, where each update call has to lock the whole table so it can scan # every record, and so it has to wait for the other process, which has its pending lock/update session.commit()
[docs] def determine_launchable_flow_count(weight_planned, weight_running, max_weight_running, max_weight_to_launch, max_flows_to_launch): logger = get_run_logger() amount_to_launch = max_weight_running - weight_running logger.info(f"Total weight {amount_to_launch:.2f} can be launched at this time.") amount_to_launch = min(amount_to_launch, max_weight_to_launch) amount_to_launch = max(0, amount_to_launch) logger.info(f"Will launch up to {amount_to_launch:.2f} weight and {max_flows_to_launch} flows") return min(amount_to_launch, weight_planned), max_flows_to_launch
[docs] @task(cache_policy=NO_CACHE) async def launch_ready_flows(session: Session, flow_info: List[List[Flow]], tags_by_flow: dict[int, str], pipeline_config: dict) -> None: """Given a list of ready-to-launch flow_ids, this task creates flow runs in Prefect for them. These flow runs are automatically marked as scheduled in Prefect and will be picked up by a work queue and agent as soon as possible. Parameters ---------- session : sqlalchemy.orm.session.Session A SQLAlchemy session for database interactions flow_info : List[int] A list of flow IDs from the punchpipe database identifying which flows to launch Returns ------- A list of responses from Prefect about the flow runs that were created """ if not len(flow_info): return logger = get_run_logger() # gather the flow information for launching # If we don't shuffle, flows will be sorted by priority which may implicitly be a sort by flow type. This could # mean we launch all the quick flows at once and then later all the slow flows at once, but we'll get better # performance overall by mixing the fast and slow flows since the total CPU demand will be more uniform through this # scheduling window. shuffle(flow_info) async with get_client() as client: # determine the deployment ids for each kind of flow deployments = await client.read_deployments() deployment_ids = {d.name: d.id for d in deployments} # We want to stagger launches through a time window. If our configured time window is 5 minutes, we'll use 4 # full minutes, plus a portion of the fifth, aiming to end after 4m35s to leave margin so the flow is fully # finished after 5 minutes. # First we work out the remainder part, figuring out where we are relative to the 35th second of the current # minute. total_delay_time = 35 - datetime.now().second total_delay_time = max(0, total_delay_time) total_delay_time += (pipeline_config['control']['launcher']['launch_time_window_minutes'] - 1) * 60 # Launch a chunk every 10 seconds through this window n_chunks = total_delay_time // 10 n_chunks = max(n_chunks, 1) chunk_size = ceil(len(flow_info) / n_chunks) logger.info(f"Total delay time: {total_delay_time}") if chunk_size >= len(flow_info): delay_time = 0 else: delay_time = total_delay_time / (n_chunks - 1) awaitables = [] responses = [] all_chunks = list(batched(flow_info, chunk_size)) for chunk_number, chunk in enumerate(all_chunks): start = datetime.now().timestamp() for batch in chunk: for flow in batch: flow.state = "launched" flow.launch_time = datetime.now() session.commit() # Launch the chunk n_actual_flows = 0 for batch in chunk: flow_ids = [flow.flow_id for flow in batch] flow_types = set(flow.flow_type for flow in batch) assert len(flow_types) == 1 unique_tags = set() for flow in batch: unique_tags.update(tags_by_flow[flow.flow_id]) unique_tags = list(unique_tags) if len(flow_ids) > 1: unique_tags.append("batch") this_deployment_id = deployment_ids[batch[0].flow_type + "_process_flow"] parameters = {"flow_id": flow_ids[0] if len(flow_ids) == 1 else flow_ids} awaitables.append(client.create_flow_run_from_deployment( this_deployment_id, parameters=parameters, tags=unique_tags) ) n_actual_flows += len(flow_ids) responses.extend(await asyncio.gather(*awaitables)) awaitables = [] logger.info(f"Chunk {chunk_number}/{len(all_chunks)} sent, containing {n_actual_flows} flows " f"in {len(chunk)} batches") if delay_time: # Stagger the launches await asyncio.sleep(delay_time - (datetime.now().timestamp() - start)) # TODO This doesn't seem to be an effective way to check for a failed flow submission, but we should # do something like this that works ok_responses = [r for r in responses if r.name not in [None, ''] and r.state_name == 'Scheduled'] bad_responses = [r for r in responses if r not in ok_responses] if len(bad_responses): bad_flow_ids = [] for r in bad_responses: flow_id = r.parameters['flow_id'] if isinstance(flow_id, list): bad_flow_ids.extend(flow_id) else: bad_flow_ids.append(flow_id) session.execute( update(Flow) .where(Flow.state == 'launched') .where(Flow.flow_id.in_(bad_flow_ids)) .values(state='planned') ) session.commit() for r in bad_responses: logger.warning(f"Got bad response {repr(r)}")
[docs] def load_flow_data(pipeline_config): flow_weights = dict() flow_enabled = dict() flow_batch_size = dict() for flow_type in pipeline_config["flows"]: flow_enabled[flow_type] = pipeline_config["flows"][flow_type].get("enabled", True) is True flow_weights[flow_type] = pipeline_config["flows"][flow_type].get("launch_weight", 1) flow_batch_size[flow_type] = pipeline_config["flows"][flow_type].get("batch_size", 1) return flow_weights, flow_enabled, flow_batch_size
[docs] @flow async def launcher(pipeline_config_path=None): """The main launcher flow for Prefect, responsible for identifying flows, based on priority, that are ready to run and creating flow runs for them. It also escalates long-waiting flows' priorities. See EM 41 or the internal requirements document for more details Returns ------- Nothing """ logger = get_run_logger() if pipeline_config_path is None: pipeline_config_path = await Variable.get("punchpipe_config", "punchpipe_config.yaml") pipeline_config = load_pipeline_configuration(pipeline_config_path) flow_weights, flow_enabled, flow_batch_sizes = load_flow_data(pipeline_config) logger.info(f"Enabled flows: {', '.join([flow for flow, enabled in flow_enabled.items() if enabled])}") logger.info("Establishing database connection") session = get_database_session() escalate_long_waiting_flows(session, pipeline_config) # Perform the launcher flow responsibilities num_running_flows, num_planned_flows, weight_planned, weight_running = count_flows(session, flow_weights) logger.info(f"There are {num_running_flows} flows running right now (weight {weight_running:.2f}) and {num_planned_flows} planned flows (weight {weight_planned:.2f}).") max_weight_running = pipeline_config["control"]["launcher"]["max_weight_running"] max_weight_to_launch = pipeline_config["control"]["launcher"]["max_weight_to_launch_at_once"] max_flows_to_launch = pipeline_config["control"]["launcher"]["max_flows_to_launch_at_once"] weight_to_launch, max_flows_to_launch = determine_launchable_flow_count( weight_planned, weight_running, max_weight_running, max_weight_to_launch, max_flows_to_launch) flows_to_launch, tags_by_flow, selected_weight, number_of_flows, counts_per_type = gather_planned_flows( session, weight_to_launch, max_flows_to_launch, flow_weights, flow_enabled, flow_batch_sizes) ids = [[flow.flow_id for flow in batch] for batch in flows_to_launch] logger.info(f"{number_of_flows} flows (weight {selected_weight:.2f}) with IDs of {ids} will be launched.") counts = [f"{counts_per_type[type]} {type}" for type in sorted(counts_per_type.keys())] if len(counts): logger.info("This consists of " + ", ".join(counts)) await launch_ready_flows(session, flows_to_launch, tags_by_flow, pipeline_config) logger.info("Launcher flow exit.")